在 node.js 中协调并行执行

IT技术 javascript concurrency node.js parallel-processing fork-join
2021-01-19 11:41:30

node.js 的事件驱动编程模型使得协调程序流有些棘手。

简单的顺序执行变成了嵌套的回调,这很容易(虽然写下来有点复杂)。

但是并行执行呢?假设您有三个可以并行运行的任务 A、B、C,当它们完成时,您想将它们的结果发送到任务 D。

使用 fork/join 模型,这将是

  • 叉A
  • 叉B
  • 前叉 C
  • 加入A、B、C,运行D

我如何在 node.js 中编写它?是否有任何最佳实践或食谱?我是否每次都必须手动滚动解决方案,或者是否有一些带有帮助程序的库?

6个回答

node.js 中没有什么是真正并行的,因为它是单线程的。但是,可以按您无法事先确定的顺序安排和运行多个事件。像数据库访问这样的一些事情实际上是“并行的”,因为数据库查询本身在单独的线程中运行,但在完成后重新集成到事件流中。

那么,如何在多个事件处理程序上安排回调?嗯,这是浏览器端 javascript 动画中使用的一种常用技术:使用变量来跟踪完成。

这听起来像是一个黑客,它确实如此,而且在进行跟踪时留下一堆全局变量听起来可能很混乱,并且使用较少的语言。但是在 javascript 中我们可以使用闭包:

function fork (async_calls, shared_callback) {
  var counter = async_calls.length;
  var callback = function () {
    counter --;
    if (counter == 0) {
      shared_callback()
    }
  }

  for (var i=0;i<async_calls.length;i++) {
    async_calls[i](callback);
  }
}

// usage:
fork([A,B,C],D);

在上面的例子中,我们通过假设异步和回调函数不需要参数来保持代码简单。您当然可以修改代码以将参数传递给异步函数,并让回调函数累积结果并将其传递给 shared_callback 函数。


补充回答:

实际上,即使如此,该fork()函数已经可以使用闭包将参数传递给异步函数:

fork([
  function(callback){ A(1,2,callback) },
  function(callback){ B(1,callback) },
  function(callback){ C(1,2,callback) }
],D);

剩下唯一要做的就是累积来自 A、B、C 的结果并将它们传递给 D。


更多附加答案:

我无法抗拒。吃早餐的时候一直在想这个。这是一个fork()累积结果的实现(通常作为参数传递给回调函数):

function fork (async_calls, shared_callback) {
  var counter = async_calls.length;
  var all_results = [];
  function makeCallback (index) {
    return function () {
      counter --;
      var results = [];
      // we use the arguments object here because some callbacks 
      // in Node pass in multiple arguments as result.
      for (var i=0;i<arguments.length;i++) {
        results.push(arguments[i]);
      }
      all_results[index] = results;
      if (counter == 0) {
        shared_callback(all_results);
      }
    }
  }

  for (var i=0;i<async_calls.length;i++) {
    async_calls[i](makeCallback(i));
  }
}

这很容易。这具有fork()相当通用的用途,可用于同步多个非同类事件。

Node.js 中的示例用法:

// Read 3 files in parallel and process them together:

function A (c){ fs.readFile('file1',c) };
function B (c){ fs.readFile('file2',c) };
function C (c){ fs.readFile('file3',c) };
function D (result) {
  file1data = result[0][1];
  file2data = result[1][1];
  file3data = result[2][1];

  // process the files together here
}

fork([A,B,C],D);

更新

这段代码是在像 async.js 这样的库或各种基于 Promise 的库存在之前编写的。我想相信 async.js 受到了这个启发,但我没有任何证据。不管怎样..如果你今天想这样做,看看 async.js 或 promises。只需考虑上面的答案,就可以很好地解释/说明 async.parallel 之类的事情是如何工作的。

为了完整起见,以下是您如何使用async.parallel

var async = require('async');

async.parallel([A,B,C],D);

请注意,它的async.parallel工作原理与fork我们上面实现功能完全相同主要区别在于它D按照 node.js 约定将错误作为第一个参数传递给,并将回调作为第二个参数传递。

使用 Promise,我们将其编写如下:

// Assuming A, B & C return a promise instead of accepting a callback

Promise.all([A,B,C]).then(D);
“node.js 中没有什么是真正并行的,因为它是单线程的。” 不对。不使用 CPU 的一切(例如等待网络 I/O)并行运行。
2021-03-15 11:41:30
@Thilo:通常我们将不使用 CPU 的代码称为未运行。如果您不运行,则无法并行“运行”。
2021-03-18 11:41:30
我说这些不是并行执行的函数是否正确,但它们(充其量)以不确定的顺序执行,代码在每个“async_func”返回之前都不会进展?
2021-03-22 11:41:30
在大多数情况下,这是真的。在 Node 中等待 IO 并不会阻止其他代码运行,但是当代码运行时,它一次一个。Node 中唯一真正的并行执行来自产生子进程,但几乎可以说任何环境都是如此。
2021-03-26 11:41:30
@MooGoo:这意味着对于事件,因为我们知道它们绝对不能并行运行,所以我们不必担心信号量和互斥量,而对于线程,我们必须锁定共享资源。
2021-04-10 11:41:30

我相信现在“async”module提供了这种并行功能,并且与上面的 fork 功能大致相同。

async.parallel 确实和上面的fork函数做了大致相同的事情
2021-03-12 11:41:30
这是不正确的,异步只会帮助您在单个进程中组织代码流。
2021-03-15 11:41:30
这不是真正的并行
2021-04-08 11:41:30

期货module有一个名为子module加入,我喜欢使用:

将异步调用连接在一起,类似于pthread_join线程的工作方式。

自述文件显示了一些很好的例子,可以自由地使用它或使用Promise 模式使用future子module。文档中的示例:

var Join = require('join')
  , join = Join()
  , callbackA = join.add()
  , callbackB = join.add()
  , callbackC = join.add();

function abcComplete(aArgs, bArgs, cArgs) {
  console.log(aArgs[1] + bArgs[1] + cArgs[1]);
}

setTimeout(function () {
  callbackA(null, 'Hello');
}, 300);

setTimeout(function () {
  callbackB(null, 'World');
}, 500);

setTimeout(function () {
  callbackC(null, '!');
}, 400);

// this must be called after all 
join.when(abcComplete);

这里可能有一个简单的解决方案:http : //howtonode.org/control-flow-part-ii滚动到并行操作。另一种方法是让 A、B 和 C 都共享相同的回调函数,让该函数具有全局或至少函数外增量器,如果这三个都调用了回调,则让它运行 D,当然,您还必须将 A、B 和 C 的结果存储在某处。

另一个选项可能是 Node 的 Step module:https : //github.com/creationix/step

看起来 step 没有真正的并行性。
2021-03-21 11:41:30