过滤实时信号的最佳窗口策略

信息处理 Python 过滤 即时的
2022-02-18 15:56:22

我每秒有 250 个样本流到达管道阶段,看起来像这个简化的示例。

class PipeLineStage(...):
    historical_buffer = []
    def _process(self, sample):
        historical_buffer.append(sample)
        altered_sample = ...
        return altered_sample

_process每当有新样本从设备到达并将返回值传递给下一个阶段时,管道管理器都会间接调用。

我希望函数的效果是应用 50hz 陷波滤波器。

所以我的问题是:我应该如何理想地窗口/缓冲传入样本的信号?

具体来说,我应该如何:

  • 当采样率为 250hz 时,决定多少个样本最适合滤除 50hz 信号?(实时观看的延迟最小)
  • 管理连续信号的窗口化;值得在重叠缓冲区上应用 lfilter,例如样本 1-100,然后是 50-150,然后是 100-200 等?还是在每次应用 lfilter 后清除缓冲区并具有单独/离散的窗口,例如样本 0-100、100-200 等是否一样好?

换句话说,我担心将带有 lfilter 的 butterfilter 应用于离散窗口是否会在滤波信号中引入不连续性或其他伪影,其重要性足以证明滚动窗口的性能成本是合理的。

我思考这个问题的方式有意义吗?

1个回答

如果可能,请设计您的过滤器,然后执行必须执行的操作(缓冲等)以正确实现该过滤器。在实现有限脉冲响应 (FIR) 滤波器时,例如:(伪代码:)

out[t] = 0.5*in[t] - 0.6*in[t-1] + 0.7*in[t-2] - 0.8*in[t-3]

其中t是整数时间索引,您可以这样做,以便将一些过去的in值存储在缓冲区中,如下所示,步进t

t buf[0] buf[1] buf[2]
----------------------
0 0      0      0
1 0      0      in[0]
2 0      in[0]  in[1]
3 in[0]  in[1]  in[2]
4 in[1]  in[2]  in[3]
5 in[2]  in[3]  in[4]
...

这样你总是有in[t-1]in[t-2]并且in[t-3]可用于计算out[t]也许您担心在像这样的“滚动”缓冲区中移动数据的性能成本。但是可以通过使用循环缓冲区来避免额外的工作,其长度通常四舍五入到最接近的2^n(integer n),以便可以使用 binary 完成环绕and (2^n-1)下一个写入位置存储在一个额外的状态变量中i

t i buf[0] buf[1] buf[2] buf[3]
-------------------------------
0 0 0      0      0      0
1 1 in[0]  0      0      0
2 2 in[0]  in[1]  0      0
3 3 in[0]  in[1]  in[2]  0
4 0 in[0]  in[1]  in[2]  in[3]
5 1 in[4]  in[1]  in[2]  in[3]
6 2 in[4]  in[5]  in[2]  in[3]
7 3 in[4]  in[5]  in[6]  in[3]
8 0 in[4]  in[5]  in[6]  in[7]
...

如果您使用单指令多数据 (SIMD) 加速,in[t]在计算out[t].

如果您有一个无限脉冲响应 (IIR) 滤波器(如 Butterworth),您可以使用另一个循环缓冲区来存储过去的out值。这将给出一个称为直接形式 I 的 IIR 实现结构。还有其他结构可能具有更好的数值稳定性,也可能不具有更好的数值稳定性,请参见http://www.earlevel.com/main/2003/02/28/biquads/

如果您不关心滤波器频率响应的相位,一个简单的二极点、二零陷波 IIR 滤波器将完成消除 50 Hz 的工作:(伪代码:)

params:
fs = sampling frequency
f = notch frequency
r = sharpness, 0..1 excluding 1

init:
z1x = cos(2*pi*f/fs)
a0a2 = (1.0 - r)*(1.0 - r)/(2.0*(fabs(z1x) + 1.0)) + r
a1 = -2.0*z1x*a0a2
b1 = 2.0*z1x*r
b2 = -r*r

loop:
out[t] = a0a2*(in[t] + in[t-2]) + a1*in[t-1]
         + b1*out[t-1] + b2*out[t-2]

对于这样一个简单的过滤器,您可能希望跳过实施复杂的索引缓冲方案。