这个问题有一个更常见的版本,关于 pandas应用函数的并行化 - 所以这是一个令人耳目一新的问题 :)
首先,由于您要求提供“打包”解决方案,因此我想更快速地提一下,并且它出现在大多数关于 pandas 并行化的 SO 问题中。
但是..我仍然想分享我的个人 gist 代码,因为在使用 DataFrame 几年后,我从未找到 100% 并行化解决方案(主要用于应用功能),我总是不得不回来为我的“手册”代码。
多亏了你,我让它更通用地支持任何(理论上的)DataFrame 方法(所以你不必保留 isin、apply 等的版本)。
我使用 python 2.7 和 3.6 对“isin”、“apply”和“isna”函数进行了测试。它不到 20 行,我遵循 pandas 命名约定,如“subset”和“njobs”。
我还添加了与“isin”的 dask 等效代码的时间比较,它似乎比这个要点慢 X2 倍。
它包括2个功能:
df_multi_core - 这是你调用的那个。它接受:
- 你的 df 对象
- 您要调用的函数名称
- 可以对其执行函数的列子集(有助于减少时间/内存)
- 并行运行的作业数(-1 或省略所有核心)
- df 函数接受的任何其他 kwargs(如“轴”)
_df_split - 这是一个内部辅助函数,必须全局定位到正在运行的模块(Pool.map 是“位置相关的”),否则我会在内部找到它。
这是我的gist中的代码(我将在那里添加更多 pandas 函数测试):
import pandas as pd
import numpy as np
import multiprocessing
from functools import partial
def _df_split(tup_arg, **kwargs):
split_ind, df_split, df_f_name = tup_arg
return (split_ind, getattr(df_split, df_f_name)(**kwargs))
def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
if njobs == -1:
njobs = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=njobs)
try:
splits = np.array_split(df[subset], njobs)
except ValueError:
splits = np.array_split(df, njobs)
pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
results = pool.map(partial(_df_split, **kwargs), pool_data)
pool.close()
pool.join()
results = sorted(results, key=lambda x:x[0])
results = pd.concat([split[1] for split in results])
return results
Bellow 是并行化isin的测试代码,比较原生、多核 gist 和 dask 性能。在具有 8 个物理内核的 I7 机器上,我得到了大约 X4 倍的加速。我很想听听您对真实数据的了解!
from time import time
if __name__ == '__main__':
sep = '-' * 50
# isin test
N = 10000000
df = pd.DataFrame({'c1': np.random.randint(low=1, high=N, size=N), 'c2': np.arange(N)})
lookfor = np.random.randint(low=1, high=N, size=1000000)
print('{}\ntesting pandas isin on {}\n{}'.format(sep, df.shape, sep))
t1 = time()
print('result\n{}'.format(df.isin(lookfor).sum()))
t2 = time()
print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))
t3 = time()
res = df_multi_core(df=df, df_f_name='isin', subset=['c1'], njobs=-1, values=lookfor)
print('result\n{}'.format(res.sum()))
t4 = time()
print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))
t5 = time()
ddata = dd.from_pandas(df, npartitions=njobs)
res = ddata.map_partitions(lambda df: df.apply(apply_f, axis=1)).compute(scheduler='processes')
t6 = time()
print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
print('time for dask implementation {}\n{}'.format(round(t6 - t5, 2), sep))
--------------------------------------------------
testing pandas isin on (10000000, 2)
--------------------------------------------------
result
c1 953213
c2 951942
dtype: int64
time for native implementation 3.87
--------------------------------------------------
result
c1 953213
dtype: int64
time for multi core implementation 1.16
--------------------------------------------------
result
c1 953213
c2 951942
dtype: int64
time for dask implementation 2.88