既然消息接收器已被弃用,如何从 gnuradio 流程图中提取数据?

信息处理 格努拉迪奥
2022-02-04 05:26:52

背景:我正在编写一个 Python 脚本,其中包含一个用于解调传入 IQ 数据的 gnuradio 流程图对象。当我定期轮询它以获取解调数据时,流程图将启动并运行。

过去,我使用消息接收器和 msgq 将数据从流程图中传递出来,但这已被弃用。我现在正在尝试使用较新的块来做事情:特别是尝试将输出数据传递到“流到标记流”块,然后传递到“标记流到 PDU”块。

我现在正试图弄清楚如何将数据从流程图中取出并放入我的主要 python 代码中。在文档和论坛中,我看到了很多关于如何开发 OOT 块的信息,但对我正在尝试做的事情却很少。

我已经创建了我正在尝试做的简化版本。该流程图由一个信号源斜坡组成,该斜坡馈送到“标记流到 PDU”块。我不关心与标签关联的字符串,我只想读取流数据(构成输入斜坡数据的浮点数)。我看到的大多数参考资料都使用“get_tags_in_range”方法,但这是私有的,对我的情况没有帮助。我已经能够查询块并获取项目计数,但仅此而已。

我不知道为什么它给我带来了这么多麻烦,但我会很感激任何帮助。

I've have attached the code below:
from gnuradio import analog
from gnuradio import blocks
from gnuradio import eng_notation
from gnuradio import filter
from gnuradio import gr
from gnuradio.eng_option import eng_option
from gnuradio.filter import firdes
from gnuradio import digital
import pmt
import math
import numpy

# build simple flowgraph
class simple_flowgraph(gr.top_block):
    def __init__(self):

        gr.top_block.__init__(self)

        samp_rate = 32000.0

        # generate repeating ramp from 0 to 1
        self.blocks_throttle_0 = blocks.throttle(
            gr.sizeof_float*1,
            samp_rate,
            True)
        self.blocks_tagged_stream_to_pdu_0 = blocks.tagged_stream_to_pdu(
            blocks.float_t,
            'packet_len')
        self.blocks_stream_to_tagged_stream_0 = blocks.stream_to_tagged_stream(
            gr.sizeof_float,
            1,
            32,
            "packet_len")
        self.analog_sig_source_x_0 = analog.sig_source_f(
            samp_rate,
            analog.GR_SAW_WAVE,
            4e3,
            1,
            0)

        ##################################################
        # Connections
        ##################################################
        self.connect(
            (self.analog_sig_source_x_0, 0),
            (self.blocks_throttle_0, 0)
        )
        self.connect(
            (self.blocks_stream_to_tagged_stream_0, 0),
            (self.blocks_tagged_stream_to_pdu_0, 0)
        )
        self.connect(
            (self.blocks_throttle_0, 0),
            (self.blocks_stream_to_tagged_stream_0, 0)
        )



flowgraph = simple_flowgraph()
flowgraph.start()

current_nitems_count = \
    flowgraph.blocks_tagged_stream_to_pdu_0.nitems_read(0)
previous_nitems_count = current_nitems_count

print current_nitems_count

while True:
    # get data from flowgraph when ready
    current_nitems_count = \
        flowgraph.blocks_tagged_stream_to_pdu_0.nitems_read(0)

    tag_list = []

    # while there is data,
    if current_nitems_count > previous_nitems_count + 16:
        print "Got new chunk of data\n  Item Count: {}".format(current_nitems_count)
        previous_nitems_count = current_nitems_count

    #???

感谢 Marcus 提供的出色信息,我已经掌握了使用 ZMQ 推/拉接收器的东西。这是更新的代码:

from gnuradio import analog
from gnuradio import blocks
from gnuradio import gr
from gnuradio import zeromq
import zmq
import array

socket_str = "tcp://127.0.0.1:5557"


def zmq_consumer():
    context = zmq.Context()
    results_receiver = context.socket(zmq.PULL)
    results_receiver.connect(socket_str)
    while True:
        # pull in raw binary data
        raw_data = results_receiver.recv()
        # convert to an array of floats
        float_list = array.array('f', raw_data) # struct.unpack will be faster
        # print flowgraph data
        for signal_val in float_list:
            print signal_val


# build simple flowgraph that outputs a repeating ramp from 0 to 1
class simple_flowgraph(gr.top_block):
    def __init__(self):

        gr.top_block.__init__(self)

        samp_rate = 32000.0

        # generate repeating ramp from 0 to 1
        self.analog_sig_source_x_0 = analog.sig_source_f(
            samp_rate,
            analog.GR_SAW_WAVE,
            4e3,
            1,
            0)
        self.blocks_throttle_0 = blocks.throttle(
            gr.sizeof_float*1,
            samp_rate,
            True)
        self.zeromq_push_sink_0 = zeromq.push_sink(gr.sizeof_float,
                                                   1,
                                                   socket_str,
                                                   100,
                                                   False,
                                                   -1)

        ##################################################
        # Connections
        ##################################################
        self.connect(
            (self.analog_sig_source_x_0, 0),
            (self.blocks_throttle_0, 0)
        )
        self.connect(
            (self.blocks_throttle_0, 0),
            (self.zeromq_push_sink_0, 0)
        )


# instantiate and start the flowgraph
flowgraph = simple_flowgraph()
flowgraph.start()

# start the receiver socket
zmq_consumer()
1个回答

因此,从您的介绍中得到的重要结论是,您有一个应用程序需要反复从流程图中获取大量项目。这意味着您处于流媒体案例中。(对于未来的读者:如果您只想在流程图完成运行后一次获得所有数据,则只需使用 Vector Sink)

因此,这里有多种方法:

  • 使用矢量接收器,轮询该矢量接收器。仅在您拥有 GNU Radio >= 3.7.12.0 时使用我们忘了让那个东西线程安全,即。您可能会在一段std::vector时间内获取数据,因为它正在被修改。如果你想轮询,这仍然是选择的方法——只是你修复了向量接收器(它真的没那么糟糕——只是引入一个互斥体),并且可能还引入一个pop()方法,给你一个副本内部数据并清空内部存储向量。我们可以讨论如何帮助您实现这一点。
  • 如果您只需要偶尔采样某些东西(不适用于从 GNU Radio 中获取所有数据,就像每隔大约几毫秒随机采样一次),Signal Probe 可能就是您想要的。在信号处理方面,我绝对相信这几乎不是可行的方法。
  • 编写一个 C++ 类,它是gr::basic_block;的子类 你必须实现一个void post(pmt::pmt_t which_port, pmt::pmt_t msg)方法,然后你可以在新式消息传递(不是旧式消息队列)方案中使用这个块,就像你使用 GNU Radio 块一样。在该类中,发出 Qt 信号,以更新您的 Qt 小部件!
  • 推荐,因为最简单,而且很可能性能非常好:在您的流程图中使用 ZeroMQ 接收器,并在您的 Qt 应用程序中使用匹配的 ZeroMQ 套接字。
    有低开销的进程内通信套接字类型(ZMQ 不一定会推断出网络堆栈开销)。ZeroMQ 提供了许多简单的解决方案,因为它具有用于大多数可想到问题的套接字(对)类型,例如:
    • GNU Radio 中的 ZMQ PUSH Sink,Qt 中的 ZMQ PULL 套接字:顾名思义,sink 只是在样本进入时推出它们。在这种情况下,您需要在流程图中进行速率限制(就像您现在所做的那样,使用你的油门)
    • GNU Radio 中的 ZMQ REP(ly) SINK,Qt 中的 ZMQ REQ(uest) 套接字:更像是 GNU Radio 充当服务器,响应应用程序对新数据的请求。这样,Qt 应用程序定义了数据生成的速度(因为 GNU Radio 的流是背压驱动的),因此您的 Throttle 将是不必要的。

因此,从更高层的角度来看,您必须决定(对于每个 DSP 系统,而不是 GNU Radio,而不仅仅是这个特定问题)您是否要轮询,并且是背压驱动,或者是否有某些东西定义了您的处理速率,无论如何,你可以推出信号。