实时音频中的动态过滤器

信息处理 过滤器 Python 即时的 scipy
2022-01-31 00:36:08

根据我之前的问题:Removing cracking in real time audio,我正在尝试在实时音频中实现动态过滤器。

我想要做的是创建一个过滤器,我可以在运行时更改截止频率。到目前为止,我已经实现了一个简单的代码,它在每次迭代时将截止频率增加 10Hz。代码如下并且工作正常:

import pyaudio
import wave
import time
import numpy as np
import scipy.io.wavfile as sw
import librosa
import scipy.signal
import scipy
import sys
from scipy.io.wavfile import write


############ Global variables ###################
filename = '../wav/The_Weeknd.wav' #Test file
chunk = 512 #frame size
#Conversion from np to pyAudio types
np_to_pa_format = {
    np.dtype('float32') : pyaudio.paFloat32,
    np.dtype('int32') : pyaudio.paInt32,
    np.dtype('int16') : pyaudio.paInt16,
    np.dtype('int8') : pyaudio.paInt8,
    np.dtype('uint8') : pyaudio.paUInt8
}
np_type_to_sample_width = {
    np.dtype('float32') : 4,
    np.dtype('int32') : 4,
    np.dtype('int16') : 3,
    np.dtype('int8') : 1,
    np.dtype('uint8') : 1
}
STEREO = 2 #channels
#################################################

# Simple class which reads an input test wav file and reproduce it in a real time fashion. Used to test real time functioning.
class Player:
    # Loading the input test file. Crop to 30 seconds length
    def __init__(self):
        self.input_array, self.sample_rate = librosa.load(filename, sr=44100, dtype=np.float32, offset=30, duration=60)

        #print(self.sample_rate)
        #print(self.input_array.shape)
        self.cycle_count = 0
        self.highcut = 300
        self.filter_state = np.zeros(4)

    def bandPassFilter(self,signal, highcut):
        fs = 44100
        lowcut = 20
        highcut = highcut

        nyq= 0.5 * fs
        low = lowcut / nyq
        high = highcut / nyq

        order = 2

        b, a = scipy.signal.butter(order, [low,high], 'bandpass', analog=False)

        y, self.filter_state = scipy.signal.lfilter(b,a,signal, axis=0, zi=self.filter_state) # NB: filtfilt needs forward and backward information to filter. So it can't be used in realtime filtering where i have no info about future samples! lfilter is better for real time applications!
        return(y)

    def pyaudio_callback(self,in_data, frame_count, time_info, status):
        audio_size = np.shape(self.input_array)[0]
        #print(audio_size)
        #print('frame count: ', frame_count)

        if frame_count*self.cycle_count > audio_size:
            # Processing is complete.
            #print('processing complete')
            return (None, pyaudio.paComplete)
        elif frame_count*(self.cycle_count+1) > audio_size:
            # Last frame to process.
            #print('1 left frame')
            frames_left = audio_size - frame_count*self.cycle_count
        else:
            # Every other frame.
            #print('everyotherframe')
            frames_left = frame_count

        data = self.input_array[frame_count*self.cycle_count:frame_count*self.cycle_count+frames_left]
        data = self.bandPassFilter(data, self.highcut)
        if(self.highcut<20000):
            self.highcut += 10

        #print('len of data', data.shape)

        #write('test.wav', 44100, data) #Saves correctly the file!
        out_data = data.astype(np.float32).tobytes()
        #print('printing length: ',len(out_data))
        #print(out_data)
        self.cycle_count+=1
        #print(self.cycle_count)
        #print('pyaudio continue value: ',pyaudio.paContinue)
        return (out_data, pyaudio.paContinue)





    def start_non_blocking_processing(self, save_output=True, frame_count=2**10, listen_output=True):
        '''
        Non blocking mode works on a different thread, therefore, the main thread must be kept active with, for example:
            while processing():
                time.sleep(1)
        '''
        self.save_output = save_output
        self.frame_count = frame_count

        # Initiate PyAudio
        self.pa = pyaudio.PyAudio()
        # Open stream using callback
        self.stream = self.pa.open(format=np_to_pa_format[self.input_array.dtype],
                        channels=1,
                        rate=self.sample_rate,
                        output=listen_output,
                        input=not listen_output,
                        stream_callback=self.pyaudio_callback,
                        frames_per_buffer=frame_count)

        # Start the stream
        self.stream.start_stream()


    def processing(self):
        '''
        Returns true if the PyAudio stream is still active in non blocking mode.
        MUST be called AFTER self.start_non_blocking_processing.
        '''
        return self.stream.is_active()

    def terminate_processing(self):
        '''
        Terminates stream opened by self.start_non_blocking_processing.
        MUST be called AFTER self.processing returns False.
        '''
        # Stop stream.
        self.stream.stop_stream()
        self.stream.close()

        # Close PyAudio.
        self.pa.terminate()

        # Resets count.
        self.cycle_count = 0
        # Resets output.
        self.output_array = np.array([[], []], dtype=self.input_array.dtype).T



if __name__ == "__main__":
    print('RUNNING MAIN')
    player = Player()
    player.start_non_blocking_processing()
    while(player.processing()):
        time.sleep(0.1)
    player.terminate_processing()

正如另一位用户在我之前的回答中建议我的那样,在每次迭代时在回调函数中重新创建过滤器并不是一个好主意,这可能会导致问题和无用的额外计算。

除了使用新的截止频率重新创建整个滤波器之外,我一直试图找到一个更好的解决方案,但我找不到更好的解决方案。

有没有办法(例如使用 scipy)在运行时更改截止频率而不重新创建整个过滤器?

1个回答

几点建议:

  1. 我会用极点和零点做所有滤波器设计,而不是滤波器系数
  2. 将所有内容实现为二阶部分,每个部分具有一个复杂的极对。
  3. 巴特沃斯滤波器的零点是恒定的:它们位于z=1,z=1. 您不需要明确计算b过滤器的系数。总是如此b=[1,2,1]或者[1,2,1]加上一个增益
  4. 对于任何复杂的极点,p,你的分母系数很简单a0=1,a1=2Re{p},a2=pp
  5. 模拟巴特沃斯滤波器的极点均匀分布在 s 平面单位圆的左半边。这可以通过特定截止频率的双线性变换轻松映射到 z 平面。这涉及一个超越函数调用和一个除法。不便宜,但也不算太贵
  6. 如果这太贵了,你可以在你想要的频率范围内列出极点位置。然后插值或进行多项式拟合。极点位置的插值是完全安全的,滤波器系数的插值是有风险的。
  7. 您的过滤器是时变的,它可以在任何更新时引发“拉链”噪音。很多噪声来自于状态变量的传递函数的变化。最小化这种影响的最佳方法是对滤波器使用直接 I 型拓扑,因为状态变量只是输入和输出,不依赖于传递函数。转置形式 II 也是可用的(但不是那么好),而其他两种拓扑可能非常糟糕。
  8. 编写自己的 Direct Form I Butterworth 滤波器也可能比使用便宜lfilter()因为你可以跳过所有乘法b系数。另一方面,有人可能已经优化了 lfilter() 的生活日光,所以这取决于。
  9. 您仍然需要限制更新的速度以及极点在帧之间移动的速度。一种简单的方法是在极点位置本身放置一个一阶低通滤波器。实施起来非常便宜,您可以拨入低通滤波器的时间常数来优化更新速度,同时避免任何不可接受的更新噪音。
  10. 根据你想走多低(频率),随着两极的接近,你可能需要特别慢z=1. 它们可以非常接近单位圆,即使是很小的极点变化也会导致很大的频率跳跃。如果这是一个问题,您可能需要扭曲频率轴,但这不适合胆小的人,因为您的滤波器阶数非常低,没有它您可能会侥幸逃脱。