我的总和正弦波似乎只有在我的波长可被时间步长整除时才有效。可能是相位问题

信息处理 fft 信号分析 阶段 IFFT
2022-01-29 11:04:14

我在 python 中创建了一个程序,它可以创建任何通用信号和一个时间范围来显示它,如下面的红色“--”行所示。然后,我创建了一种通过 fft 计算该信号中每个频率的相位、幅度和波长的方法。然后,我根据这些 amp、ph 和波长为每个频率创建一个方程。然后我对所有这些方程求和,结果是相同的原始信号,如下面的红色圆圈所示。到目前为止,这一切都很好,证明我的波长、相位、振幅和方程都是正确的。我现在想使用每个单独的频率曲线的方程来预测未来的点。我可以对每个单独的方程执行此操作,没有问题,如下所示。这是频谱中的单个频率。 在此处输入图像描述 然后我对这些预测求和,就像对方程求和一样,这就是问题出现的地方。仅当我的波长可被我的时间步长整除时,这才有效。因此,如果我的范围为 (1, 100) dt=1,波长为 20,则此方法有效。如果我将其更改为 (1, 110) 它完全颠倒了,但其他一切仍然有效。这里发生了什么?这是很多代码,但如果需要,我可以根据要求显示任何内容。这是工作信号的总和.. 范围 (1, 100) 波长=20 在此处输入图像描述 这是当我将时间步长更改为不能被波长整除时...范围 (1, 90) 波长=20 在此处输入图像描述 范围 (1, 95 ) 波长=20 在此处输入图像描述 更改波长也会产生相同的效果。这里发生了什么???

此外,我可以证明每个单独的频率都是正确预测的。预测左侧的实线相加后等于实际曲线。但是,预测的实线(如图所示)在求和时与实际验证集不匹配。 在此处输入图像描述

from numpy import pi, sin, arange, mean, abs as absolute, where, nanmin, nanmax, angle, arctan2, sqrt, array, random, append
from matplotlib import pyplot as plt
from scipy import fftpack
import pandas as pd


class GenericSignals():
    
    def __init__(self, time_vec):
        self.time_vec = time_vec
    
    def createSingleSignal(self, w=.2, a=10, ph=0):
        # x is x axis time vector, w is wavelength, a is amplitude, ph is phase
        x = self.time_vec
        return a * sin((2 * pi / w * x) + ph)
    
    def combineNSignals(self, *args):
        return sum(args)
    
    def getFrequencies(self, signal, dt):
        sig_fft = fftpack.fft(signal)
        sample_freq = fftpack.fftfreq(len(signal), d=dt)
        power = absolute(sig_fft) * dt
        return power, sample_freq
    
    def findPeakFrequencies(self, sample_freq, power, n=1):
        #n is number of frequencies to get
        pos_mask = where(sample_freq > 0)
        freqs = sample_freq[pos_mask]
        pos_power = power[pos_mask]
        pos_power_sorted = sorted(pos_power)
        dominent = pos_power_sorted[-n:]
        # need to reorganize dominent to have proper sorting now
        dominant_freq_indices = []
        peak_freqs = []
        for i in range(len(dominent)):
            dominant_freq_indices.append(where(pos_power == dominent[i])[0][0])
            peak_freqs.append(freqs[dominant_freq_indices[i]])
        return peak_freqs
    
    def createIndividualSignalsForEachFreq(self, signal, sample_freq):
        filtered_signals = []
        for i in range(len(sample_freq)):
            high_freq_fft = fftpack.fft(signal)
            high_freq_fft[absolute(sample_freq) < nanmin(sample_freq[i])] = 0
            high_freq_fft[absolute(sample_freq) > nanmax(sample_freq[i])] = 0
            filtered_sig = fftpack.ifft(high_freq_fft)
            filtered_sig -= mean(filtered_sig)
            filtered_signals.append(filtered_sig)
        return filtered_signals
    
    def getPhaseAmplitudeWavelength(self, signal, freq, sample_freq):
        # if statement resolves a divide by 0 runtime warning
        if freq == 0:
            fixed_freq = .0000000000001
            wavelength = 1 / fixed_freq 
        else:
            wavelength = 1 / freq 
        sig_size = len(signal)
        sig_fft = fftpack.fft(signal)
        sample_index = where(sample_freq==freq)
        phase = (arctan2(sig_fft[sample_index].imag, sig_fft[sample_index].real))[0]
        ph = phase + pi/2
        amp = (sqrt((sig_fft[sample_index].real * sig_fft[sample_index].real) + (sig_fft[sample_index].imag * sig_fft[sample_index].imag)) / (sig_size / 2))[0]
        return ph, amp, wavelength
    
    def getAllPhaseAmplitudeWavelengths(self, all_signals, sample_freq):
        wavelengths = []
        phases = []
        amplitudes = []
        i = 0
        for individual_signal in all_signals:
            phase, amplitude, wavelength = self.getPhaseAmplitudeWavelength(individual_signal, sample_freq[i], sample_freq)
            i += 1
            wavelengths.append(wavelength)
            phases.append(phase)
            amplitudes.append(amplitude)
        return wavelengths, phases, amplitudes

    def eqn(self, signal, wavelength, time_vec, phase, amp):
        signals_mean = absolute(mean(signal))
        return (amp * sin((2 * pi / wavelength * time_vec) + phase)) + signals_mean
    
    def getEquations(self, wavelength, time_vec, ph, amp):
        equations = []
        for i in range(len(wavelength)):
            equation = (amp[i] * sin((2 * pi / wavelength[i] * time_vec) + ph[i]))
            equations.append(equation)
        return equations
    
    def predictFuture(self, new_time_vec, equations, wavelength, ph, amp):
        # addidtional_step = new_time_vec[-1] + 1
        # new_time_vec = append(new_time_vec, addidtional_step)[1:]
        pred = []
        for i in range(len(equations)):
            pred.append(self.eqn(equations[i], wavelength[i], new_time_vec, ph[i], amp[i]))
        return pred
    
class PreProcessData():
    
    def __init__(self, data):
        self.data = data
        
    # separate data into test and validate sets for x and y and for each interval
    def createTestValidateSets(self, data, interval_to_predict):
        # interval to predict must be less than 1/3 size od dataset
        try:
            data_x = data[0]
            data_y = data[1]
            if interval_to_predict * 3 >= len(data_x):
                interval_to_predict = 1
            if interval_to_predict == 0:
                test_x = data_x
                val_x = []
                test_y = data_y
                val_y = []
            else:
                test_x = data_x[:-interval_to_predict]
                val_x = data_x[-interval_to_predict:]
                test_y = data_y[:-interval_to_predict]
                val_y = data_y[-interval_to_predict:]
            return test_x, test_y, val_x, val_y
        except Exception as e:
            return 'PreProcessData.createTestValidateSets failed: ' + e
    
if __name__ == "__main__":
    
    dt = 1
    time_vec = arange(0, 100, dt)
    genSig = GenericSignals(time_vec)
    testing_signal = genSig.createSingleSignal(w=20, a=15, ph=0)
    testing_signal2 = genSig.createSingleSignal(w=10, a=15, ph=0)
    test_signal = testing_signal
    predict_interval = 10
    preProcessedData = PreProcessData(test_signal)
    test_x, test_y, val_x, val_y = preProcessedData.createTestValidateSets([time_vec, test_signal], predict_interval)
    power, sample_freq = genSig.getFrequencies(test_y, dt)
    individual_signals = genSig.createIndividualSignalsForEachFreq(test_y, sample_freq)
    test_index = 5
    fft = fftpack.fft(individual_signals[test_index])
    fft_phase = arctan2(fft[test_index].imag, fft[test_index].real)

    wavelengths, phases, amplitudes = genSig.getAllPhaseAmplitudeWavelengths(individual_signals, sample_freq)
    equations = genSig.getEquations(wavelengths, test_x, phases, amplitudes)
    index2 = int(len(test_x) / 2 - 1)
    all_equations = sum(equations[1:index2])
    y_shift = mean(test_y) - mean(all_equations)
    all_equations += y_shift
    predicted_curves = genSig.predictFuture(val_x, equations, wavelengths, phases, amplitudes)
    total_pred = sum(predicted_curves)
    total_pred += y_shift
    fig, axs = plt.subplots(1, 1, figsize=(15,5))
    axs.plot(test_x, individual_signals[test_index], 'r--', label='single exact curve')
    axs.plot(test_x, equations[test_index], 'ro', label='single equation curve')
    axs.plot(val_x, predicted_curves[test_index], 'r', label='single predcted curve')
    # axs.plot(time_vec, test_signal, 'b--', label='test set')
    # axs.plot(test_x, all_equations, 'bo', label='ALL equations set')
    # axs.plot(val_x, val_y, 'g--', label='val set')
    # for i in range(1, 8):
    #     axs.plot(val_x, predicted_curves[i], label=f'pred set: {i}')
    # axs.plot(val_x, sum(predicted_curves[1:3]), label='pred curves')
    axs.plot(val_x, total_pred, 'go', label='predicted set')
    axs.legend()
    fig.tight_layout()
    plt.show() 
    

澄清一下,FFT 在 FFT 的范围内工作。外面是行不通的。下面显示了执行 FFT 的边界。右边的东西被计算和求和,就像一个罪波的总和。这就是问题所在。 在此处输入图像描述

1个回答

我认为有一个简单的解释:DFT 的扩展是周期性的。换句话说,将 DFT 在很长一段时间内发现的所有正弦曲线相加将产生重复给 DFT 的数据。这是一个重现您的情况的示例。蓝点是提供给 DFT 的样本,在 0.25 秒内采集。由 DFT 计算的正弦曲线的扩展(以红色绘制)也是周期性的,周期为 0.25。

在此处输入图像描述

当整数个周期完全适合采样间隔时,您的程序会按预期工作,因为在这种情况下不会引入不连续性。

一般来说,是否可以使用 DFT 推断信号?我认为,当已知信号由少量个正弦波组成时,您能做的最好的事情就是只保留个最大的 DFT 箱并扩展它们——但结果并不准确,因为输入能量将溢出到被忽略的垃圾箱。这是一个带有单个正弦曲线的示例:NN

在此处输入图像描述