如何标准化高度波动的时间序列?

机器算法验证 时间序列 正常化
2022-03-31 03:46:38

我应该如何标准化显示高波动性的时间序列数据以便在 ANN 中使用?让我们以比特币的价格为例。

我有以下疑问:

  • 本文所述,仅仅标准化数据集是不够的。这是有道理的,因为比特币的价格范围是 0.06 美元到 19,343 美元。我认为如果将数据作为滑动窗口提供给 ANN,那么每个窗口都必须进行标准化。
  • Min-Max 和 Z-Score 等方法适用于模式比较并标准化范围。1000 美元和 1200 美元之间的模式与 1000 美元和 1400 美元之间的模式完全相同,因此丢失了有价值的信息。当然,可以创建一个附加特征来表示波动性。

那么,将波动的时间序列数据(如比特币的价格)标准化的最佳方法是什么?

1个回答

TLDR

重新阅读链接到论文,非常好!无论如何,我认为是这样,一旦你确定了你想要解决的问题,它似乎有一些可靠选择......


现在我不打算把这些留给你,所以让我们在牢记你的问题的同时剖析这篇论文的一些内容。

该论文讨论了他们何时选择对页面上的数据进行规范化和重新规范化,2 1.2.3 Arbitrary Query Lengths cannot be Indexed并且在该页面的末尾附近非常清楚,处理如此大的数据集时,没有一种用于任意长度的相似性搜索的技术。

...尽管如果我了解他们在这种情况下要搜索的内容,那么只有在您输入大量可能在相同或相邻时间间隔内相关或不相关的序列时,这才应该是一个问题...... , DTW (Dynamic Time Warp)那里的东西很时髦,我会重申重新阅读这篇论文可能是一个好主意......这对于医学相关领域来说完全有道理,例如。论文中的 EEG 示例将是n每个时间段从每个受试者的大脑中采样的点数t乘以时间片的长度。换句话说,一公吨的数据!

n这引出了一个问题,即您希望使用加密货币的维度有多大?

无论如何,4.2.1就数据预处理而言,部分事情变得有趣,它们甚至在最后提供了[43]一些源代码的链接以供使用。

至于预处理的最佳方式,这似乎仍然是一个积极研究的主题,并且取决于模型,以及正在训练的内容和方式。尽管就希望在每个时间范围内进行标准化或标准化而言,您似乎走在正确的轨道上。

关于相关问题的一个建议如何将无界变量表示为 0 到 1 之间的数字(这似乎是您的问题的核心unbounded variable,比特币是一个可测量的维度),提供了一些伪代码Using a trainable minmax我已使用以下最小最大化方法将其部分转换为(部分功能)Python 的版本,来自How to normalize data between -1 and 1? ,用于演示您将在预处理中遇到的一些问题。

x=(ba)xminxmaxxminx+a

#!/usr/bin/env python
from __future__ import division


class Scaler_MinMax(list):
    """
    Scaler_MinMax is a `list` that scales inputted, or `append`ed, or `extend`ed values to be between chosen `scale_`s
    """

    def __init__(self, l, scale_min = -1, scale_max = 1, auto_recalibrate = False):
        self.scale_min = scale_min
        self.scale_max = scale_max
        self.range_min = min(l)
        self.range_max = max(l)
        self.initial_values = l
        self.auto_recalibrate = auto_recalibrate
        super(Scaler_MinMax, self).__init__(self.scale(numbers = l))

    def scale(self, numbers = [], scale_min = None, scale_max = None, range_min = None, range_max = None):
        """
        Returns list of scaled values
        """
        if scale_min is None:
            scale_min = self.scale_min
        if scale_max is None:
            scale_max = self.scale_max

        if range_min is None:
            range_min = self.range_min
        if range_max is None:
            range_max = self.range_max

        return [((x - range_min) / (range_max - range_min)) * (scale_max - scale_min) + scale_min for x in numbers]

    def unscale(self, numbers = [], scale_min = None, scale_max = None, range_min = None, range_max = None):
        """
        Returns list of unscaled values
        """
        if not numbers:
            numbers = self

        if scale_min is None:
            scale_min = self.scale_min
        if scale_max is None:
            scale_max = self.scale_max

        if range_min is None:
            range_min = self.range_min
        if range_max is None:
            range_max = self.range_max

        return [(((y - scale_min) / (scale_max - scale_min)) * (range_max - range_min)) + range_min for y in numbers]

    def re_calibrate(self):
        """
        Re-sets `self.range_min`, `self.range_max`, and `self`
        """
        self.range_min = min(self.initial_values)
        self.range_max = max(self.initial_values)
        super(Scaler_MinMax, self).__init__(self.scale(
            numbers = self.initial_values,
            scale_min = self.scale_min,
            scale_max = self.scale_max,
            range_min = self.range_min,
            range_max = self.range_max))

    """
    Supered class overrides
    """

    def __add__(self, other):
        if not isinstance(other, list):
            raise TypeError("can only concatenate list (not '{0}') to list".format(type(other)))

        return super(Scaler_MinMax, self).__add__(self.scale(numbers = other))

    def append(self, value):
        """
        Appends to `self.initial_values` and `self.append(self.scale([value])[0])`
        """
        self.initial_values.append(value)
        if isinstance(value, list):
            super(Scaler_MinMax, self).append([self.scale(numbers = [i])[0] for i in value])
        else:
            super(Scaler_MinMax, self).append(self.scale(numbers = [value])[0])

        if self.auto_recalibrate is True:
            if self.range_max != max(self.initial_values) or self.range_min != min(self.initial_values):
                self.re_calibrate()

    def clear(self):
        """
        Clears `self.initial_values` and `self` of __ALL__ values
        """
        self.initial_values.clear()
        super(Scaler_MinMax, self).clear()

    def extend(self, iterable):
        """
        Extends `self.initial_values` and `self.extend(self.scale(iterable))`
        """
        self.initial_values.extend(iterable)
        super(Scaler_MinMax, self).extend(self.scale(iterable))

        if self.auto_recalibrate is True:
            if self.range_max != max(self.initial_values) or self.range_min != min(self.initial_values):
                self.re_calibrate()

    def insert(self, index, value):
        """
        Inserts `value` into `self.initial_values` and `self.scale([value])` into `self`
        """
        self.initial_values.insert(index, value)
        super(Scaler_MinMax, self).insert(index, self.scale([value]))

        if self.auto_recalibrate is True:
            if self.range_max != max(self.initial_values) or self.range_min != min(self.initial_values):
                self.re_calibrate()

    def pop(self, index):
        """
        Returns tuple of `pop`ed `index`s values from `self.initial_values`, and `self`
        """
        output = self.initial_values.pop(index), super(Scaler_MinMax, self).pop(index)

        if self.auto_recalibrate is True:
            if self.range_max != max(self.initial_values) or self.range_min != min(self.initial_values):
                self.re_calibrate()

        return output

    def remove(self, value):
        """
        Removes value from `self.initial_values` and `self`

        > use of `index` calls to ensure paired values are `pop`ed/`remove`d does increase the expense of this method
        """
        if value in self.initial_values:
            i = self.initial_values.index(value)
            self.pop(i)
            self.initial_values.remove(value)
        elif value in self:
            i = self.index(value)
            self.initial_values.pop(i)
            super(Scaler_MinMax, self).remove(value)
        else:
            raise ValueError("{0}.remove(x) not in list".format(self.__class__.__name__))

        if self.auto_recalibrate is True:
            if self.range_max != max(self.initial_values) or self.range_min != min(self.initial_values):
                self.re_calibrate()

    def reverse(self):
        """
        Reverses `self.initial_values` and `self` __IN PLACE__
        """
        self.initial_values.reverse()
        super(Scaler_MinMax, self).reverse()

    def sort(self):
        """
        Sorts `self.initial_values` and `self` __IN PLACE__
        """
        self.initial_values.sort()
        super(Scaler_MinMax, self).sort()

示例用法和限制

s = Scaler_MinMax([x for x in range(9)])
s
#  -> [-1.0, -0.75, -0.5, -0.25, 0.0, 0.25, 0.5, 0.75, 1.0]

s.unscale()
#  -> [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]

s.initial_values
#  -> [0, 1, 2, 3, 4, 5, 6, 7, 8]

s.append(20)
s
#  -> [-1.0, -0.75, -0.5, -0.25, 0.0, 0.25, 0.5, 0.75, 1.0, 4.0]

s.re_calibrate()
s
#  -> [-1.0, -0.9, -0.8, -0.7, -0.6, -0.5, -0.4, -0.30000000000000004, -0.19999999999999996, 1.0]

s.unscale()     # Floating point errors!
#  -> [0.0, 0.9999999999999998, 1.9999999999999996, 3.0000000000000004, 4.0, 5.0, 6.0, 7.0, 8.0, 20.0]

s.initial_values
#  -> [0, 1, 2, 3, 4, 5, 6, 7, 8, 20]

如上所示,浮点错误是initial_values保存和使用的原因,re_calibrate()如果不这样做并unscale()用于re_calibrate()浮点错误会复合!此外,不仅有很多地方可以改进代码,而且在已解析数据旁边有一个未解析数据的额外副本可以使内存使用量增加一倍以上。

链接到论文中还讨论了浮点错误的累加,以及它们在4.2.1 Early Abandoning Z-Normalization接近结尾的页面上的缓解方法,他们建议每百万个子序列“清除”此类错误。


旁注,以便每个人都在同一页面上...

μ=1mxi

...在python中看起来有点像...

#!/usr/bin/env python
from __future__ import division

# Thanks be to https://stackoverflow.com/a/31136897/2632107
try:
    # Python 2
    range = xrange
except NameError:
    # Python 3
    pass


def calc_mu(m_denominator=2, x_list=[1.5, 42], start=0, end=2, step=1):
  ''' μ = 1/m * ∑x_i '''
  chosen = []
  for i in range(start, end, step):
    chosen.append(x_list[i])
  return float(1) / m_denominator * sum(chosen)

......好吧,如果我理解的话An example of subscript and summation notation,我必须复习一下并涉足可汗学院Sequences and Series,以便更全面地掌握SIDKDD trillion论文的数学内容。


希望将一些数学与执行类似功能的代码一起揭开你如何开始尝试数据预处理方法的神秘面纱。因为看起来会有很多试错法来找到最好的预处理方法,不管它试图强制馈送到神经网络的方法是什么。

我知道我没有一个完整的答案,但如果有一种公开可用的最佳方法来预测y时间序列预测,那么很可能会是市场的拖钓,而不仅仅是来自机器人的闪崩……哪个,提示,提示,也许是训练模型进行预测的好主意;-)

如顶部所述,我认为您需要非常精确地确定要解决的问题,例如,您可以尝试预测价格的相对变化,而不是直接预测价格,在这种情况下,百分位数可能会起作用,有点像使用与论文中所示类似的方法将遗传信息编码为时间序列。

未来编辑器的代码注释

我没有在代码中添加Terracethreshold建议代码,以使事情变得更简单,此外,我编写代码示例的目的不是为了提高效率或在生产中使用,而是为了便于访问,请保留任何编辑更正并本着让尽可能多的读者可以访问的精神。Scaler_MinMax

否则,我认为读者/编辑尝试用另一种语言编写自己的代码示例和/或使用不同的方法来压缩数据会更有帮助,这很酷,看看其他人想出什么.