Hankel 矩阵 SVD 去噪

信息处理 过滤器 离散信号 Python 去噪 svd
2022-01-28 11:30:43

我已经执行了汉克尔矩阵奇异值分解去噪以平滑我的单变量时间序列。它是欧元/美元汇率的收盘价。这是一张图片:平滑系列与实际系列

我遇到的问题是数据的结尾似乎是错误的。我该如何解决这个问题,或者有更好的方法来对我的时间序列进行去噪,例如卡尔曼滤波器或小波变换。这是我的 Python 代码的主要部分:

import numpy as np
import pandas_datareader as pdr
from datetime import datetime 
from scipy.linalg import hankel
import matplotlib.pyplot as plt

symbol = "EURUSD=X"
df = pdr.DataReader(symbol, "yahoo", datetime(2000, 1, 1),
                        datetime.now()).drop(columns=["Adj Close", "Volume"])

hankel_matrix = hankel(df.Close)

U, S, VT = np.linalg.svd(hankel_matrix)

first_k_singulars = 40
S = [0 if i > first_k_singulars else j for i, j in zip(range(len(S)), S)]

close = U @ np.diag(S) @ VT

max_col = len(close[0])
max_row = len(close)
fdiag = [[] for _ in range(max_row + max_col - 1)]

for x in range(max_col):
    for y in range(max_row):
        fdiag[x + y].append(close[y][x])

avg_fdiag = []  
for i, j in zip(fdiag, range(1, len(fdiag)+1)):
    avg_fdiag.append(np.sum(i)/j)

close = avg_fdiag[:len(df)] # take this length of our avg_fdiag as it is a hankel matrix
1个回答

一个简单的方法是只取时间序列的最后一个值并不断重复它。

如果我将最后一个值重复 100 次,那么最后我仍然会得到较大的下降,但重复意味着最终数据不会受到影响。

完整数据

放大不重复的部分:

缩放到数据末尾。


Python代码

import numpy as np
import pandas_datareader as pdr
from datetime import datetime 
from scipy.linalg import hankel
import matplotlib.pyplot as plt


symbol = "EURUSD=X"
df = pdr.DataReader(symbol, "yahoo", datetime(2000, 1, 1),
                        datetime.now()).drop(columns=["Adj Close", "Volume"])

subset = np.concatenate((df.Close.values, df.Close.values[-1:]*np.ones(100)))
hankel_matrix = hankel(subset)

U, S, VT = np.linalg.svd(hankel_matrix)

first_k_singulars = 90
S = [0 if i > first_k_singulars else j for i, j in zip(range(len(S)), S)]

close = U @ np.diag(S) @ VT

max_col = len(close[0])
max_row = len(close)
fdiag = [[] for _ in range(max_row + max_col - 1)]

for x in range(max_col):
    for y in range(max_row):
        fdiag[x + y].append(close[y][x])

avg_fdiag = []  
for i, j in zip(fdiag, range(1, len(fdiag)+1)):
    avg_fdiag.append(np.sum(i)/j)

close = avg_fdiag[:len(subset)] # take this length of our avg_fdiag as it is a hankel matrix

plt.plot(subset)
plt.plot(close)