将尝试根据 DS/ML 模型生产中并行计算的经验和理解来回答:
以高水平回答您的问题:
- 上面的简单程序在运行时是否会通过增加 n_jobs 为您提供更好的性能?
answer: Yes and can be seen bellow in results.
- 在什么操作系统/设置上?
answer: OS:ubuntu, 2xCPUsx16Cores+512GB RAM with python=3.7, joblib>=0.14.1 and sklearn >=0.22.1
- 有什么必须调整才能正常工作吗?
yes: change/force parallel_backend to be used other then sequential (requires joblib approach with registered parallel_backend and you can use sklearn.utils.parallel_backend ... I tried sequential from sklearn model you have with n_jobs=-1 into joblib Parallel and got huge scale but need to look more for correctness but did saw huge improvement when scaled to 100mil samples on my machine so worth to test it since were amazed by performance with predefined backend
.
我的康达设置:
scikit-image 0.16.2 py37h0573a6f_0
scikit-learn 0.22.1 py37hd81dba3_0
ipython 7.12.0 py37h5ca1d4c_0
ipython_genutils 0.2.0 py37_0
msgpack-python 0.6.1 py37hfd86e86_1
python 3.7.6 h357f687_2 conda-forge
python-dateutil 2.8.1 py_0
python_abi 3.7 1_cp37m conda-forge
joblib 0.14.1 py_0
如果您使用个人机器或工作站,请尝试为您的机器保留 1 个核心n_jobs=-2
,您可以增加数据,因为这是 joblib 优化的目的(并非所有算法都支持这种方法,但这里超出了范围)并且还更改了后端因为默认情况下不会执行并行任务并且只会使用顺序,可能有更多数据正在执行自动“模式”,但不确定它是否基于,因为我使用 1k、10k 100k、1 mil 和 10 mil 样本进行测试并且没有 loky后端 ElasticNetCV 不会退出顺序后端。
Joblib经过优化,特别是在大数据上快速且健壮,并针对 numpy 数组进行了特定优化。
作为解释,将研究如何计算resources
:
对于低于 -1 的 n_jobs,使用 (n_cpus + 1 + n_jobs)。因此,对于 n_jobs = -2,使用除一个以外的所有 CPU。None 是“未设置”的标记,将被解释为 n_jobs=1(顺序执行)
您的代码表现不佳,n_jobs=-1
因此请尝试n_jobs=-2
以下事实:
- 确实使用了所有CPU 内核(基于文档),但您可以通过从机器的joblib注册parallel_backend来更改使用线程,这样如果其他进程确实使用 CPU 线程/内核(在您的情况下),这会很慢并且会降低性能这种情况正在发生(您正在运行操作系统和其他需要 CPU 电源才能运行的进程)并且也没有充分利用“线程”,因此根据您的性能问题使用“核心”。
作为一个示例,您将使用when on cluster 模式,因此作为容器的工作人员确实已经分配了核心,并将"n_jobs=-1"
利用优化或计算部分。parallel approach
distribute
- 在这种情况下,您会耗尽 CPU 资源,并且不要忘记并行不是
“cheap”
因为为每个“作业”复制相同的数据,因此您将同时获得所有“分配”。
- sklearn 并行实现并不完美,因此在您的情况下将尝试使用n_jobs=-2或者如果您想使用joblib那么您可以有更多优化算法的空间。您的 CV 部分是所有性能下降的地方,因为将是
parallelized
.
将从 joblib 添加以下内容,以更好地了解您的案例和限制+差异如何工作:
backend: str, ParallelBackendBase instance or None, default: ‘loky’
Specify the parallelization backend implementation. Supported backends are:
“loky” used by default, can induce some communication and memory overhead when exchanging input and output data with the worker Python processes.
“multiprocessing” previous process-based backend based on multiprocessing.Pool. Less robust than loky.
“threading” is a very low-overhead backend but it suffers from the Python Global Interpreter Lock if the called function relies a lot on Python objects. “threading” is mostly useful when the execution bottleneck is a compiled extension that explicitly releases the GIL (for instance a Cython loop wrapped in a “with nogil” block or an expensive call to a library such as NumPy).
finally, you can register backends by calling register_parallel_backend. This will allow you to implement a backend of your liking.
从源代码的实现中,我确实看到 sklearn 确实使用内核或者是首选但不是所有算法线程的默认值:_joblib.py
import warnings as _warnings
with _warnings.catch_warnings():
_warnings.simplefilter("ignore")
# joblib imports may raise DeprecationWarning on certain Python
# versions
import joblib
from joblib import logger
from joblib import dump, load
from joblib import __version__
from joblib import effective_n_jobs
from joblib import hash
from joblib import cpu_count, Parallel, Memory, delayed
from joblib import parallel_backend, register_parallel_backend
__all__ = ["parallel_backend", "register_parallel_backend", "cpu_count",
"Parallel", "Memory", "delayed", "effective_n_jobs", "hash",
"logger", "dump", "load", "joblib", "__version__"]
但是您列出Elastic Net model
的部分算法CV
确实使用“线程”作为首选(_joblib_parallel_args(prefer="threads")),并且似乎是只考虑核心的窗口的错误:
mse_paths = Parallel(n_jobs=self.n_jobs, verbose=self.verbose,
**_joblib_parallel_args(prefer="threads"))(jobs)
注意:此答案来自日常工作以利用sparkjoblib
andparallel_backend('spark')
和
的经验parallel_backend('dask')
。可以按预期扩展并快速运行,但不要忘记我拥有的每个执行器基本上都具有 4 个内核和 4-32GB 内存,因此在执行时n_jobs=-1
确实会在每个执行器内部并行执行部分 joblib 任务,并且不会复制相同的数据注意到因为是分布式的。
是否完美运行 CV 和适合零件,我n_jobs=-1
在执行适合或CV零件时使用。
我使用 OP 默认设置的结果:
# 没有跟踪/进度执行是更快的执行,但需要添加进度以清楚起见:
n_jobs = None, perf_counter = 1.4849148329813033 ([Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.)
n_jobs = 1, perf_counter = 1.4728297910187393 ([Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.)
n_jobs = 2, perf_counter = 1.470994730014354 ([Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.)
n_jobs = 4, perf_counter = 1.490676686167717 ([Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.)
n_jobs = 8, perf_counter = 1.465600558090955 ([Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.)
n_jobs = 12, perf_counter = 1.463360101915896 ([Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.)
n_jobs = 16, perf_counter = 1.4638906640466303 ([Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.)
n_jobs = 20, perf_counter = 1.4602260519750416 ([Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.)
n_jobs = 24, perf_counter = 1.4646347570233047 ([Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.)
n_jobs = 28, perf_counter = 1.4710926250554621 ([Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.)
n_jobs = -1, perf_counter = 1.468439529882744 ([Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.)
n_jobs = -2, perf_counter = 1.4649679311551154 ([Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.)
# 为了清晰起见,需要添加进度+详细信息的跟踪/进度执行:
0%| | 0/12 [00:00<?, ?it/s][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 2.9s finished
8%|▊ | 1/12 [00:02<00:31, 2.88s/it][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
n_jobs = None, perf_counter = 2.8790326060261577
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 2.8s finished
17%|█▋ | 2/12 [00:05<00:28, 2.87s/it][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
n_jobs = 1, perf_counter = 2.83856769092381
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 2.8s finished
25%|██▌ | 3/12 [00:08<00:25, 2.85s/it][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
n_jobs = 2, perf_counter = 2.8207667160313576
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 2.8s finished
33%|███▎ | 4/12 [00:11<00:22, 2.84s/it][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
n_jobs = 4, perf_counter = 2.8043343869503587
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 2.7s finished
42%|████▏ | 5/12 [00:14<00:19, 2.81s/it][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
n_jobs = 8, perf_counter = 2.730375789105892
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 2.9s finished
50%|█████ | 6/12 [00:16<00:16, 2.82s/it][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
n_jobs = 12, perf_counter = 2.8604282720480114
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 2.8s finished
58%|█████▊ | 7/12 [00:19<00:14, 2.83s/it][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
n_jobs = 16, perf_counter = 2.847634136909619
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 2.8s finished
67%|██████▋ | 8/12 [00:22<00:11, 2.84s/it][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
n_jobs = 20, perf_counter = 2.8461739809717983
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 2.9s finished
75%|███████▌ | 9/12 [00:25<00:08, 2.85s/it][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
n_jobs = 24, perf_counter = 2.8684673600364476
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 2.9s finished
83%|████████▎ | 10/12 [00:28<00:05, 2.87s/it][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
n_jobs = 28, perf_counter = 2.9122865139506757
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 3.1s finished
92%|█████████▏| 11/12 [00:31<00:02, 2.94s/it][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
n_jobs = -1, perf_counter = 3.1204342890996486
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 3.3s finished
100%|██████████| 12/12 [00:34<00:00, 2.91s/it]
n_jobs = -2, perf_counter = 3.347235122928396
魔法从这里开始:
所以我会说这实际上是错误,即使指定了 n_jobs 这也不会生效,并且仍然作为“无”或“1”运行。时间上的微小差异可能是由于使用
joblib.Memory和Checkpoint缓存结果,但需要更多地查看源代码中的这部分(我敢打赌,否则执行 CV 会很昂贵)。
作为参考:这是通过使用joblib并使用parallel_backend执行并行部分:指定parallel_backend('loky')的结果,以便能够指定Parallel在block内部使用的默认后端而不使用'auto'模式:
# 没有跟踪/进度执行是更快的执行,但需要添加进度以清楚起见:
n_jobs = None, perf_counter = 1.7306506633758545, sec
n_jobs = 1, perf_counter = 1.7046034336090088, sec
n_jobs = 2, perf_counter = 2.1097865104675293, sec
n_jobs = 4, perf_counter = 1.4976494312286377, sec
n_jobs = 8, perf_counter = 1.380277156829834, sec
n_jobs = 12, perf_counter = 1.3992164134979248, sec
n_jobs = 16, perf_counter = 0.7542541027069092, sec
n_jobs = 20, perf_counter = 1.9196209907531738, sec
n_jobs = 24, perf_counter = 0.6940577030181885, sec
n_jobs = 28, perf_counter = 0.780998945236206, sec
n_jobs = -1, perf_counter = 0.7055854797363281, sec
n_jobs = -2, perf_counter = 0.4049191474914551, sec
Completed
下面的输出将解释你所拥有的所有限制,“你所拥有的并行预期与并行完成的 insklearn 算法的印象”以及一般正在执行的内容以及如何分配工作人员:
# 为了清晰起见,需要添加进度+详细信息的跟踪/进度执行:
0%| | 0/12 [00:00<?, ?it/s][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 3.4s finished
8%|▊ | 1/12 [00:03<00:37, 3.44s/it][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
......................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................
n_jobs = None, perf_counter = 3.4446191787719727, sec
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 3.5s finished
17%|█▋ | 2/12 [00:06<00:34, 3.45s/it]
n_jobs = 1, perf_counter = 3.460832357406616, sec
[Parallel(n_jobs=2)]: Using backend LokyBackend with 2 concurrent workers.
[Parallel(n_jobs=2)]: Done 100 out of 100 | elapsed: 2.0s finished
25%|██▌ | 3/12 [00:09<00:27, 3.09s/it]
n_jobs = 2, perf_counter = 2.2389445304870605, sec
[Parallel(n_jobs=4)]: Using backend LokyBackend with 4 concurrent workers.
[Parallel(n_jobs=4)]: Done 100 out of 100 | elapsed: 1.7s finished
33%|███▎ | 4/12 [00:10<00:21, 2.71s/it]
n_jobs = 4, perf_counter = 1.8393192291259766, sec
[Parallel(n_jobs=8)]: Using backend LokyBackend with 8 concurrent workers.
[Parallel(n_jobs=8)]: Done 34 tasks | elapsed: 1.0s
[Parallel(n_jobs=8)]: Done 100 out of 100 | elapsed: 1.3s finished
42%|████▏ | 5/12 [00:12<00:16, 2.36s/it]
n_jobs = 8, perf_counter = 1.517085075378418, sec
[Parallel(n_jobs=12)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=12)]: Done 26 tasks | elapsed: 1.3s
[Parallel(n_jobs=12)]: Done 77 out of 100 | elapsed: 1.5s remaining: 0.4s
[Parallel(n_jobs=12)]: Done 100 out of 100 | elapsed: 1.6s finished
50%|█████ | 6/12 [00:14<00:13, 2.17s/it]
n_jobs = 12, perf_counter = 1.7410166263580322, sec
[Parallel(n_jobs=16)]: Using backend LokyBackend with 16 concurrent workers.
[Parallel(n_jobs=16)]: Done 18 tasks | elapsed: 0.1s
[Parallel(n_jobs=16)]: Done 100 out of 100 | elapsed: 0.7s finished
58%|█████▊ | 7/12 [00:15<00:09, 1.81s/it]
n_jobs = 16, perf_counter = 0.9577205181121826, sec
[Parallel(n_jobs=20)]: Using backend LokyBackend with 20 concurrent workers.
[Parallel(n_jobs=20)]: Done 10 tasks | elapsed: 1.6s
[Parallel(n_jobs=20)]: Done 100 out of 100 | elapsed: 1.9s finished
67%|██████▋ | 8/12 [00:17<00:07, 1.88s/it]
n_jobs = 20, perf_counter = 2.0630648136138916, sec
[Parallel(n_jobs=24)]: Using backend LokyBackend with 24 concurrent workers.
[Parallel(n_jobs=24)]: Done 2 tasks | elapsed: 0.0s
[Parallel(n_jobs=24)]: Done 100 out of 100 | elapsed: 0.5s finished
75%|███████▌ | 9/12 [00:18<00:04, 1.55s/it]
n_jobs = 24, perf_counter = 0.7588121891021729, sec
[Parallel(n_jobs=28)]: Using backend LokyBackend with 28 concurrent workers.
[Parallel(n_jobs=28)]: Done 100 out of 100 | elapsed: 0.6s finished
83%|████████▎ | 10/12 [00:18<00:02, 1.34s/it]
n_jobs = 28, perf_counter = 0.8542406558990479, sec
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 32 concurrent workers.
[Parallel(n_jobs=-1)]: Done 100 out of 100 | elapsed: 0.7s finished
92%|█████████▏| 11/12 [00:19<00:01, 1.21s/it][Parallel(n_jobs=-2)]: Using backend LokyBackend with 31 concurrent workers.
n_jobs = -1, perf_counter = 0.8903687000274658, sec
[Parallel(n_jobs=-2)]: Done 100 out of 100 | elapsed: 0.5s finished
100%|██████████| 12/12 [00:20<00:00, 1.69s/it]
n_jobs = -2, perf_counter = 0.544947624206543, sec
# # Here I do show what is doing behind and to understand differences in times and wil explain 'None' vs '1' execution time (is all about picklink process and Memory Caching implementation for paralel.
[Parallel(n_jobs=-2)]: Done 71 out of 100 | elapsed: 0.9s remaining: 0.4s
Pickling array (shape=(900,), dtype=int64).
Pickling array (shape=(100,), dtype=int64).
Pickling array (shape=(100,), dtype=float64).
Pickling array (shape=(1000, 30), dtype=float64).
Pickling array (shape=(1000,), dtype=float64).
Pickling array (shape=(900,), dtype=int64).
Pickling array (shape=(100,), dtype=int64).
Pickling array (shape=(100,), dtype=float64).
Pickling array (shape=(1000, 30), dtype=float64).
Pickling array (shape=(1000,), dtype=float64).
Pickling array (shape=(900,), dtype=int64).
Pickling array (shape=(100,), dtype=int64).
Pickling array (shape=(100,), dtype=float64).
Pickling array (shape=(1000, 30), dtype=float64).
Pickling array (shape=(1000,), dtype=float64).
Pickling array (shape=(900,), dtype=int64).
Pickling array (shape=(100,), dtype=int64).
Pickling array (shape=(100,), dtype=float64).
[Parallel(n_jobs=-2)]: Done 73 out of 100 | elapsed: 0.9s remaining: 0.3s
Completed
sklearn 中的 n_jobs sklearn 中的
并行
性 基于线程的并行性与基于进程的并行性