我在 python 中创建了一个程序,它可以创建任何通用信号和一个时间范围来显示它,如下面的红色“--”行所示。然后,我创建了一种通过 fft 计算该信号中每个频率的相位、幅度和波长的方法。然后,我根据这些 amp、ph 和波长为每个频率创建一个方程。然后我对所有这些方程求和,结果是相同的原始信号,如下面的红色圆圈所示。到目前为止,这一切都很好,证明我的波长、相位、振幅和方程都是正确的。我现在想使用每个单独的频率曲线的方程来预测未来的点。我可以对每个单独的方程执行此操作,没有问题,如下所示。这是频谱中的单个频率。
然后我对这些预测求和,就像对方程求和一样,这就是问题出现的地方。仅当我的波长可被我的时间步长整除时,这才有效。因此,如果我的范围为 (1, 100) dt=1,波长为 20,则此方法有效。如果我将其更改为 (1, 110) 它完全颠倒了,但其他一切仍然有效。这里发生了什么?这是很多代码,但如果需要,我可以根据要求显示任何内容。这是工作信号的总和.. 范围 (1, 100) 波长=20
这是当我将时间步长更改为不能被波长整除时...范围 (1, 90) 波长=20
范围 (1, 95 ) 波长=20
更改波长也会产生相同的效果。这里发生了什么???
此外,我可以证明每个单独的频率都是正确预测的。预测左侧的实线相加后等于实际曲线。但是,预测的实线(如图所示)在求和时与实际验证集不匹配。

from numpy import pi, sin, arange, mean, abs as absolute, where, nanmin, nanmax, angle, arctan2, sqrt, array, random, append
from matplotlib import pyplot as plt
from scipy import fftpack
import pandas as pd
class GenericSignals():
def __init__(self, time_vec):
self.time_vec = time_vec
def createSingleSignal(self, w=.2, a=10, ph=0):
# x is x axis time vector, w is wavelength, a is amplitude, ph is phase
x = self.time_vec
return a * sin((2 * pi / w * x) + ph)
def combineNSignals(self, *args):
return sum(args)
def getFrequencies(self, signal, dt):
sig_fft = fftpack.fft(signal)
sample_freq = fftpack.fftfreq(len(signal), d=dt)
power = absolute(sig_fft) * dt
return power, sample_freq
def findPeakFrequencies(self, sample_freq, power, n=1):
#n is number of frequencies to get
pos_mask = where(sample_freq > 0)
freqs = sample_freq[pos_mask]
pos_power = power[pos_mask]
pos_power_sorted = sorted(pos_power)
dominent = pos_power_sorted[-n:]
# need to reorganize dominent to have proper sorting now
dominant_freq_indices = []
peak_freqs = []
for i in range(len(dominent)):
dominant_freq_indices.append(where(pos_power == dominent[i])[0][0])
peak_freqs.append(freqs[dominant_freq_indices[i]])
return peak_freqs
def createIndividualSignalsForEachFreq(self, signal, sample_freq):
filtered_signals = []
for i in range(len(sample_freq)):
high_freq_fft = fftpack.fft(signal)
high_freq_fft[absolute(sample_freq) < nanmin(sample_freq[i])] = 0
high_freq_fft[absolute(sample_freq) > nanmax(sample_freq[i])] = 0
filtered_sig = fftpack.ifft(high_freq_fft)
filtered_sig -= mean(filtered_sig)
filtered_signals.append(filtered_sig)
return filtered_signals
def getPhaseAmplitudeWavelength(self, signal, freq, sample_freq):
# if statement resolves a divide by 0 runtime warning
if freq == 0:
fixed_freq = .0000000000001
wavelength = 1 / fixed_freq
else:
wavelength = 1 / freq
sig_size = len(signal)
sig_fft = fftpack.fft(signal)
sample_index = where(sample_freq==freq)
phase = (arctan2(sig_fft[sample_index].imag, sig_fft[sample_index].real))[0]
ph = phase + pi/2
amp = (sqrt((sig_fft[sample_index].real * sig_fft[sample_index].real) + (sig_fft[sample_index].imag * sig_fft[sample_index].imag)) / (sig_size / 2))[0]
return ph, amp, wavelength
def getAllPhaseAmplitudeWavelengths(self, all_signals, sample_freq):
wavelengths = []
phases = []
amplitudes = []
i = 0
for individual_signal in all_signals:
phase, amplitude, wavelength = self.getPhaseAmplitudeWavelength(individual_signal, sample_freq[i], sample_freq)
i += 1
wavelengths.append(wavelength)
phases.append(phase)
amplitudes.append(amplitude)
return wavelengths, phases, amplitudes
def eqn(self, signal, wavelength, time_vec, phase, amp):
signals_mean = absolute(mean(signal))
return (amp * sin((2 * pi / wavelength * time_vec) + phase)) + signals_mean
def getEquations(self, wavelength, time_vec, ph, amp):
equations = []
for i in range(len(wavelength)):
equation = (amp[i] * sin((2 * pi / wavelength[i] * time_vec) + ph[i]))
equations.append(equation)
return equations
def predictFuture(self, new_time_vec, equations, wavelength, ph, amp):
# addidtional_step = new_time_vec[-1] + 1
# new_time_vec = append(new_time_vec, addidtional_step)[1:]
pred = []
for i in range(len(equations)):
pred.append(self.eqn(equations[i], wavelength[i], new_time_vec, ph[i], amp[i]))
return pred
class PreProcessData():
def __init__(self, data):
self.data = data
# separate data into test and validate sets for x and y and for each interval
def createTestValidateSets(self, data, interval_to_predict):
# interval to predict must be less than 1/3 size od dataset
try:
data_x = data[0]
data_y = data[1]
if interval_to_predict * 3 >= len(data_x):
interval_to_predict = 1
if interval_to_predict == 0:
test_x = data_x
val_x = []
test_y = data_y
val_y = []
else:
test_x = data_x[:-interval_to_predict]
val_x = data_x[-interval_to_predict:]
test_y = data_y[:-interval_to_predict]
val_y = data_y[-interval_to_predict:]
return test_x, test_y, val_x, val_y
except Exception as e:
return 'PreProcessData.createTestValidateSets failed: ' + e
if __name__ == "__main__":
dt = 1
time_vec = arange(0, 100, dt)
genSig = GenericSignals(time_vec)
testing_signal = genSig.createSingleSignal(w=20, a=15, ph=0)
testing_signal2 = genSig.createSingleSignal(w=10, a=15, ph=0)
test_signal = testing_signal
predict_interval = 10
preProcessedData = PreProcessData(test_signal)
test_x, test_y, val_x, val_y = preProcessedData.createTestValidateSets([time_vec, test_signal], predict_interval)
power, sample_freq = genSig.getFrequencies(test_y, dt)
individual_signals = genSig.createIndividualSignalsForEachFreq(test_y, sample_freq)
test_index = 5
fft = fftpack.fft(individual_signals[test_index])
fft_phase = arctan2(fft[test_index].imag, fft[test_index].real)
wavelengths, phases, amplitudes = genSig.getAllPhaseAmplitudeWavelengths(individual_signals, sample_freq)
equations = genSig.getEquations(wavelengths, test_x, phases, amplitudes)
index2 = int(len(test_x) / 2 - 1)
all_equations = sum(equations[1:index2])
y_shift = mean(test_y) - mean(all_equations)
all_equations += y_shift
predicted_curves = genSig.predictFuture(val_x, equations, wavelengths, phases, amplitudes)
total_pred = sum(predicted_curves)
total_pred += y_shift
fig, axs = plt.subplots(1, 1, figsize=(15,5))
axs.plot(test_x, individual_signals[test_index], 'r--', label='single exact curve')
axs.plot(test_x, equations[test_index], 'ro', label='single equation curve')
axs.plot(val_x, predicted_curves[test_index], 'r', label='single predcted curve')
# axs.plot(time_vec, test_signal, 'b--', label='test set')
# axs.plot(test_x, all_equations, 'bo', label='ALL equations set')
# axs.plot(val_x, val_y, 'g--', label='val set')
# for i in range(1, 8):
# axs.plot(val_x, predicted_curves[i], label=f'pred set: {i}')
# axs.plot(val_x, sum(predicted_curves[1:3]), label='pred curves')
axs.plot(val_x, total_pred, 'go', label='predicted set')
axs.legend()
fig.tight_layout()
plt.show()
澄清一下,FFT 在 FFT 的范围内工作。外面是行不通的。下面显示了执行 FFT 的边界。右边的东西被计算和求和,就像一个罪波的总和。这就是问题所在。


