背景:我正在编写一个 Python 脚本,其中包含一个用于解调传入 IQ 数据的 gnuradio 流程图对象。当我定期轮询它以获取解调数据时,流程图将启动并运行。
过去,我使用消息接收器和 msgq 将数据从流程图中传递出来,但这已被弃用。我现在正在尝试使用较新的块来做事情:特别是尝试将输出数据传递到“流到标记流”块,然后传递到“标记流到 PDU”块。
我现在正试图弄清楚如何将数据从流程图中取出并放入我的主要 python 代码中。在文档和论坛中,我看到了很多关于如何开发 OOT 块的信息,但对我正在尝试做的事情却很少。
我已经创建了我正在尝试做的简化版本。该流程图由一个信号源斜坡组成,该斜坡馈送到“标记流到 PDU”块。我不关心与标签关联的字符串,我只想读取流数据(构成输入斜坡数据的浮点数)。我看到的大多数参考资料都使用“get_tags_in_range”方法,但这是私有的,对我的情况没有帮助。我已经能够查询块并获取项目计数,但仅此而已。
我不知道为什么它给我带来了这么多麻烦,但我会很感激任何帮助。
I've have attached the code below:
from gnuradio import analog
from gnuradio import blocks
from gnuradio import eng_notation
from gnuradio import filter
from gnuradio import gr
from gnuradio.eng_option import eng_option
from gnuradio.filter import firdes
from gnuradio import digital
import pmt
import math
import numpy
# build simple flowgraph
class simple_flowgraph(gr.top_block):
def __init__(self):
gr.top_block.__init__(self)
samp_rate = 32000.0
# generate repeating ramp from 0 to 1
self.blocks_throttle_0 = blocks.throttle(
gr.sizeof_float*1,
samp_rate,
True)
self.blocks_tagged_stream_to_pdu_0 = blocks.tagged_stream_to_pdu(
blocks.float_t,
'packet_len')
self.blocks_stream_to_tagged_stream_0 = blocks.stream_to_tagged_stream(
gr.sizeof_float,
1,
32,
"packet_len")
self.analog_sig_source_x_0 = analog.sig_source_f(
samp_rate,
analog.GR_SAW_WAVE,
4e3,
1,
0)
##################################################
# Connections
##################################################
self.connect(
(self.analog_sig_source_x_0, 0),
(self.blocks_throttle_0, 0)
)
self.connect(
(self.blocks_stream_to_tagged_stream_0, 0),
(self.blocks_tagged_stream_to_pdu_0, 0)
)
self.connect(
(self.blocks_throttle_0, 0),
(self.blocks_stream_to_tagged_stream_0, 0)
)
flowgraph = simple_flowgraph()
flowgraph.start()
current_nitems_count = \
flowgraph.blocks_tagged_stream_to_pdu_0.nitems_read(0)
previous_nitems_count = current_nitems_count
print current_nitems_count
while True:
# get data from flowgraph when ready
current_nitems_count = \
flowgraph.blocks_tagged_stream_to_pdu_0.nitems_read(0)
tag_list = []
# while there is data,
if current_nitems_count > previous_nitems_count + 16:
print "Got new chunk of data\n Item Count: {}".format(current_nitems_count)
previous_nitems_count = current_nitems_count
#???
感谢 Marcus 提供的出色信息,我已经掌握了使用 ZMQ 推/拉接收器的东西。这是更新的代码:
from gnuradio import analog
from gnuradio import blocks
from gnuradio import gr
from gnuradio import zeromq
import zmq
import array
socket_str = "tcp://127.0.0.1:5557"
def zmq_consumer():
context = zmq.Context()
results_receiver = context.socket(zmq.PULL)
results_receiver.connect(socket_str)
while True:
# pull in raw binary data
raw_data = results_receiver.recv()
# convert to an array of floats
float_list = array.array('f', raw_data) # struct.unpack will be faster
# print flowgraph data
for signal_val in float_list:
print signal_val
# build simple flowgraph that outputs a repeating ramp from 0 to 1
class simple_flowgraph(gr.top_block):
def __init__(self):
gr.top_block.__init__(self)
samp_rate = 32000.0
# generate repeating ramp from 0 to 1
self.analog_sig_source_x_0 = analog.sig_source_f(
samp_rate,
analog.GR_SAW_WAVE,
4e3,
1,
0)
self.blocks_throttle_0 = blocks.throttle(
gr.sizeof_float*1,
samp_rate,
True)
self.zeromq_push_sink_0 = zeromq.push_sink(gr.sizeof_float,
1,
socket_str,
100,
False,
-1)
##################################################
# Connections
##################################################
self.connect(
(self.analog_sig_source_x_0, 0),
(self.blocks_throttle_0, 0)
)
self.connect(
(self.blocks_throttle_0, 0),
(self.zeromq_push_sink_0, 0)
)
# instantiate and start the flowgraph
flowgraph = simple_flowgraph()
flowgraph.start()
# start the receiver socket
zmq_consumer()