有没有一种简单的方法可以并行运行 pandas.DataFrame.isin?

数据挖掘 表现 Python 熊猫 平行线
2021-09-18 22:33:36

我有一个建模和评分程序,它大量使用了DataFrame.isinpandas 的功能,为几千个特定页面中的每一个搜索个人用户的 Facebook“喜欢”记录列表。这是程序中最耗时的部分,比建模或评分部分更耗时,因为它只在一个内核上运行,而其余部分同时在几十个内核上运行。

虽然我知道我可以手动将数据帧分解成块并并行运行操作,但有没有直接的方法可以自动执行此操作?换句话说,是否有任何类型的包可以识别我正在运行一个易于委托的操作并自动分发它?也许这要求太高了,但过去我对 Python 中已经提供的功能感到非常惊讶,所以我认为值得一问。

关于如何实现这一点的任何其他建议(即使不是通过一些神奇的独角兽包!)也将不胜感激。主要是,只是想找到一种方法来减少每次运行 15-20 分钟,而无需花费相同的时间编写解决方案。

4个回答

不幸的是,pandas 中还没有实现并行化。如果你想参与这个功能的开发,你可以加入这个 github issue 。

我不知道任何用于此目的的“神奇独角兽包”,所以最好的办法是编写自己的解决方案。但是,如果您仍然不想花时间在上面并想学习新的东西——您可以尝试 MongoDB 中内置的两种方法(map reduce 和 agg 框架)。请参阅mongodb_agg_framework

我认为你最好的选择是罗塞塔我发现它非常有用且简单。检查它的pandas 方法

您可以通过pip获得它。

dask用于并行 numpy/pandas 作业的有用库

这个问题有一个更常见的版本,关于 pandas应用函数的并行化 - 所以这是一个令人耳目一新的问题 :)

首先,由于您要求提供“打包”解决方案,因此我想更快速地提一下,并且它出现在大多数关于 pandas 并行化的 SO 问题中。

但是..我仍然想分享我的个人 gist 代码,因为在使用 DataFrame 几年后,我从未找到 100% 并行化解决方案(主要用于应用功能),我总是不得不回来为我的“手册”代码。

多亏了你,我让它更通用地支持任何(理论上的)DataFrame 方法(所以你不必保留 isin、apply 等的版本)。

我使用 python 2.7 和 3.6 对“isin”、“apply”和“isna”函数进行了测试。它不到 20 行,我遵循 pandas 命名约定,如“subset”和“njobs”。

我还添加了与“isin”的 dask 等效代码的时间比较,它似乎比这个要点慢 X2 倍。

它包括2个功能:

df_multi_core - 这是你调用的那个。它接受:

  1. 你的 df 对象
  2. 您要调用的函数名称
  3. 可以对其执行函数的列子集(有助于减少时间/内存)
  4. 并行运行的作业数(-1 或省略所有核心)
  5. df 函数接受的任何其他 kwargs(如“轴”)

_df_split - 这是一个内部辅助函数,必须全局定位到正在运行的模块(Pool.map 是“位置相关的”),否则我会在内部找到它。

这是我的gist中的代码(我将在那里添加更多 pandas 函数测试):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

Bellow 是并行化isin的测试代码,比较原生、多核 gist 和 dask 性能。在具有 8 个物理内核的 I7 机器上,我得到了大约 X4 倍的加速。我很想听听您对真实数据的了解!

from time import time

if __name__ == '__main__': 
    sep = '-' * 50

    # isin test
    N = 10000000
    df = pd.DataFrame({'c1': np.random.randint(low=1, high=N, size=N), 'c2': np.arange(N)})
    lookfor = np.random.randint(low=1, high=N, size=1000000)

    print('{}\ntesting pandas isin on {}\n{}'.format(sep, df.shape, sep))
    t1 = time()
    print('result\n{}'.format(df.isin(lookfor).sum()))
    t2 = time()
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    res = df_multi_core(df=df, df_f_name='isin', subset=['c1'], njobs=-1, values=lookfor)
    print('result\n{}'.format(res.sum()))
    t4 = time()
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))


    t5 = time()
    ddata = dd.from_pandas(df, npartitions=njobs)
    res = ddata.map_partitions(lambda df: df.apply(apply_f, axis=1)).compute(scheduler='processes')
    t6 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for dask implementation {}\n{}'.format(round(t6 - t5, 2), sep))

--------------------------------------------------
testing pandas isin on (10000000, 2)
--------------------------------------------------
result
c1    953213
c2    951942
dtype: int64
time for native implementation 3.87
--------------------------------------------------
result
c1    953213
dtype: int64
time for multi core implementation 1.16
--------------------------------------------------
result
c1    953213
c2    951942
dtype: int64
time for dask implementation 2.88