平均绝对偏差和大数据集的在线算法

机器算法验证 算法 分位数 在线算法 大数据
2022-02-01 22:59:02

我有一个小问题让我吓坏了。我必须为多变量时间序列的在线获取过程编写程序。在每个时间间隔(例如 1 秒),我都会得到一个新样本,它基本上是一个大小为 N 的浮点向量。我需要做的操作有点棘手:

  1. 对于每个新样本,我计算该样本的百分比(通过对向量进行归一化以使元素总和为 1)。

  2. 我以相同的方式计算平均百分比向量,但使用过去的值。

  3. 对于每个过去的值,我使用步骤 2 计算的全局平均百分比向量计算与该样本相关的百分比向量的绝对偏差。这样,绝对偏差始终是 0 之间的数字(当向量等于平均值​​时向量)和 2(当它完全不同时)。

  4. 使用所有先前样本的平均偏差,我计算平均绝对偏差,它也是一个介于 0 和 2 之间的数字。

  5. 我使用平均绝对偏差来检测新样本是否与其他样本兼容(通过将其绝对偏差与在步骤 4 计算的整个集合的平均绝对偏差进行比较)。

由于每次收集新样本时,全局平均值都会发生变化(因此平均绝对偏差也会发生变化),有没有一种方法可以在不多次扫描整个数据集的情况下计算该值?(一次用于计算全球平均百分比,一次用于收集绝对偏差)。好的,我知道在不扫描整个集合的情况下计算全局平均值绝对容易,因为我只需要使用一个临时向量来存储每个维度的总和,但是平均绝对偏差呢?它的计算包括abs()运算符,所以我需要访问所有过去的数据!

谢谢你的帮助。

4个回答

如果您可以接受一些不准确,则可以通过分箱计数轻松解决此问题。也就是说,选择一些较大的数字(比如 ),然后为初始化一些整数箱,其中是向量大小,为零. 然后,当您看到个观察值时,如果个元素介于,循环遍历MM=1000Bi,ji=1Mj=1NNkBi,jj(i1)/Mi/MN向量的元素。(我假设您的输入向量是非负的,因此当您计算“百分比”时,向量在范围内。)[0,1]

在任何时间点,您都可以从 bin 中估计平均向量,以及平均绝对偏差。在观察个这样的向量之后,个元素由 平均绝对偏差的第个元素由Kj

X¯j=1Kii1/2MBi,j,
j
1Ki|Xj¯i1/2M|Bi,j

编辑:这是一个更一般的方法的具体案例,您正在建立一个经验密度估计。这可以用多项式、样条曲线等来完成,但分箱方法是最容易描述和实现的。

我过去曾使用以下方法来适度有效地计算绝对偏差(注意,这是一种程序员的方法,而不是统计学家,所以毫无疑问可能会有像shabbychef 这样的聪明技巧可能更有效)。

警告:这不是在线算法。它需要O(n)记忆。O(n)此外,对于类似的数据集[1, -2, 4, -8, 16, -32, ...](即与完全重新计算相同),它的最坏情况性能为,。[1]

但是,由于它在许多用例中仍然表现良好,因此可能值得在此处发布。例如,为了在每个项目到达时计算 -100 到 100 之间的 10000个随机数的绝对偏差,我的算法需要不到一秒,而完全重新计算需要超过 17 秒(在我的机器上,每台机器和根据输入数据)。但是,您需要在内存中维护整个向量,这可能是某些用途的限制。算法的概要如下:

  1. 不要使用单个向量来存储过去的测量值,而是使用三个排序的优先级队列(类似于最小/最大堆)。这三个列表将输入分为三个:大于平均值的项目、小于平均值的项目和等于平均值​​的项目。
  2. (几乎)每次添加一个项目时,平均值都会发生变化,因此我们需要重新分区。关键是分区的排序特性,这意味着我们不需要扫描列表中的每个项目来重新分区,我们只需要读取我们正在移动的那些项目。虽然在最坏的情况下这仍然需要O(n)移动操作,但对于许多用例来说并非如此。
  3. 使用一些巧妙的簿记,我们可以确保在重新分区和添加新项目时始终正确计算偏差。

下面是一些 Python 中的示例代码。请注意,它只允许将项目添加到列表中,而不能删除。这可以很容易地添加,但在我写这个的时候我不需要它。我没有自己实现优先级队列,而是使用了Daniel Stutzbach出色的blist 包中的 sortedlist ,它在内部使用B+Tree

考虑在MIT 许可下许可的代码。它没有经过显着优化或完善,但过去对我有用。新版本将在此处提供。如果您有任何问题或发现任何错误,请告诉我。

from blist import sortedlist
import operator

class deviance_list:
    def __init__(self):
        self.mean =  0.0
        self._old_mean = 0.0
        self._sum =  0L
        self._n =  0  #n items
        # items greater than the mean
        self._toplist =  sortedlist()
        # items less than the mean
        self._bottomlist = sortedlist(key = operator.neg)
        # Since all items in the "eq list" have the same value (self.mean) we don't need
        # to maintain an eq list, only a count
        self._eqlistlen = 0

        self._top_deviance =  0
        self._bottom_deviance =  0

    @property
    def absolute_deviance(self):
        return self._top_deviance + self._bottom_deviance

    def append(self,  n):
        # Update summary stats
        self._sum += n
        self._n +=  1
        self._old_mean =  self.mean
        self.mean =  self._sum /  float(self._n)

        # Move existing things around
        going_up = self.mean > self._old_mean
        self._rebalance(going_up)

        # Add new item to appropriate list
        if n >  self.mean:
            self._toplist.add(n)
            self._top_deviance +=  n -  self.mean
        elif n == self.mean: 
            self._eqlistlen += 1
        else:
            self._bottomlist.add(n)
            self._bottom_deviance += self.mean -  n


    def _move_eqs(self,  going_up):
        if going_up:
            self._bottomlist.update([self._old_mean] *  self._eqlistlen)
            self._bottom_deviance += (self.mean - self._old_mean) * self._eqlistlen
            self._eqlistlen = 0
        else:
            self._toplist.update([self._old_mean] *  self._eqlistlen)
            self._top_deviance += (self._old_mean - self.mean) * self._eqlistlen
            self._eqlistlen = 0


    def _rebalance(self, going_up):
        move_count,  eq_move_count = 0, 0
        if going_up:
            # increase the bottom deviance of the items already in the bottomlist
            if self.mean !=  self._old_mean:
                self._bottom_deviance += len(self._bottomlist) *  (self.mean -  self._old_mean)
                self._move_eqs(going_up)


            # transfer items from top to bottom (or eq) list, and change the deviances
            for n in iter(self._toplist):
                if n < self.mean:
                    self._top_deviance -= n -  self._old_mean
                    self._bottom_deviance += (self.mean -  n)
                    # we increment movecount and move them after the list
                    # has finished iterating so we don't modify the list during iteration
                    move_count +=  1
                elif n == self.mean:
                    self._top_deviance -= n -  self._old_mean
                    self._eqlistlen += 1
                    eq_move_count +=  1
                else:
                    break
            for _ in xrange(0,  move_count):
                self._bottomlist.add(self._toplist.pop(0))
            for _ in xrange(0,  eq_move_count):
                self._toplist.pop(0)

            # decrease the top deviance of the items remain in the toplist
            self._top_deviance -= len(self._toplist) *  (self.mean -  self._old_mean)
        else:
            if self.mean !=  self._old_mean:
                self._top_deviance += len(self._toplist) *  (self._old_mean -  self.mean)
                self._move_eqs(going_up)
            for n in iter(self._bottomlist): 
                if n > self.mean:
                    self._bottom_deviance -= self._old_mean -  n
                    self._top_deviance += n -  self.mean
                    move_count += 1
                elif n == self.mean:
                    self._bottom_deviance -= self._old_mean -  n
                    self._eqlistlen += 1
                    eq_move_count +=  1
                else:
                    break
            for _ in xrange(0,  move_count):
                    self._toplist.add(self._bottomlist.pop(0))
            for _ in xrange(0,  eq_move_count):
                self._bottomlist.pop(0)

            # decrease the bottom deviance of the items remain in the bottomlist
            self._bottom_deviance -= len(self._bottomlist) *  (self._old_mean -  self.mean)


if __name__ ==  "__main__":
    import random
    dv =  deviance_list()
    # Test against some random data,  and calculate result manually (nb. slowly) to ensure correctness
    rands = [random.randint(-100,  100) for _ in range(0,  1000)]
    ns = []
    for n in rands: 
        dv.append(n)
        ns.append(n)
        print("added:%4d,  mean:%3.2f,  oldmean:%3.2f,  mean ad:%3.2f" %
              (n, dv.mean,  dv._old_mean,  dv.absolute_deviance / dv.mean))
        assert sum(ns) == dv._sum,  "Sums not equal!"
        assert len(ns) == dv._n,  "Counts not equal!"
        m = sum(ns) / float(len(ns))
        assert m == dv.mean,  "Means not equal!"
        real_abs_dev = sum([abs(m - x) for x in ns])
        # Due to floating point imprecision, we check if the difference between the
        # two ways of calculating the asb. dev. is small rather than checking equality
        assert abs(real_abs_dev - dv.absolute_deviance) < 0.01, (
            "Absolute deviances not equal. Real:%.2f,  calc:%.2f" %  (real_abs_dev,  dv.absolute_deviance))

[1] 如果症状持续存在,请去看医生。

还有一种参数方法。忽略数据的向量性质,只看边缘,就足以解决问题:找到一个在线算法来计算标量的平均绝对偏差。如果(这是这里的大“如果”)您认为遵循一些带有未知参数的概率分布,您可以使用一些在线算法估计参数,然后根据该参数化分布计算平均绝对偏差。例如,如果您认为是(近似)正态分布,您可以估计它的标准偏差,如,平均绝对偏差将由估计(见XXXss2/π半正态分布)。

MAD(x)只是两个并发的中位数计算,每个都可以通过binmedian算法在线进行。

您可以在此处在线找到相关论文以及 C 和 FORTRAN 代码

(这只是在 Shabbychef 的巧妙技巧之上使用了一个巧妙的技巧,以节省内存)。

附录:

存在许多用于计算分位数的较旧的多遍方法。一种流行的方法是维护/更新从流中随机选择的确定大小的观测值水库,并在该水库上递归计算分位数(参见这篇评论)。这种(和相关的)方法被上面提出的方法所取代。