使用 ES6 的 Promise.all() 时限制并发的最佳方法是什么?

IT技术 javascript node.js es6-promise
2021-02-13 15:00:11

我有一些代码正在迭代从数据库中查询的列表,并为该列表中的每个元素发出 HTTP 请求。该列表有时可能是一个相当大的数字(以千计),我想确保我没有访问具有数千个并发 HTTP 请求的 Web 服务器。

此代码的缩写版本目前看起来像这样......

function getCounts() {
  return users.map(user => {
    return new Promise(resolve => {
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => {
        /* snip */
        resolve();
      });
    });
  });
}

Promise.all(getCounts()).then(() => { /* snip */});

此代码在 Node 4.3.2 上运行。重申Promise.all一下,是否可以进行管理,以便在任何给定时间只有一定数量的 Promise 正在进行?

6个回答

P-极限

我将 promise 并发限制与自定义脚本、bluebird、es6-promise-pool 和 p-limit 进行了比较。我相信p-limit有最简单、最精简的实现来满足这个需求。请参阅他们的文档

要求

与示例中的异步兼容

我的例子

在这个例子中,我们需要为数组中的每个 URL 运行一个函数(比如,可能是一个 API 请求)。这里称为fetchData(). 如果我们有数千个项目要处理的数组,并发对于节省 CPU 和内存资源肯定很有用。

const pLimit = require('p-limit');

// Example Concurrency of 3 promise at once
const limit = pLimit(3);

let urls = [
    "http://www.exampleone.com/",
    "http://www.exampletwo.com/",
    "http://www.examplethree.com/",
    "http://www.examplefour.com/",
]

// Create an array of our promises using map (fetchData() returns a promise)
let promises = urls.map(url => {

    // wrap the function we are calling in the limit function we defined above
    return limit(() => fetchData(url));
});

(async () => {
    // Only three promises are run at once (as defined above)
    const result = await Promise.all(promises);
    console.log(result);
})();

控制台日志结果是您解析的Promise响应数据的数组。

感谢您进行比较。您是否与github.com/rxaviers/async-pool进行了比较
2021-03-16 15:00:11
除此之外,每周在 npm 上的下载量约为 2000 万次,而其他答案中提到的其他库的下载量约为 200-100k。
2021-03-16 15:00:11
使用方便,不错的选择。
2021-03-25 15:00:11
谢谢这个!这个简单多了
2021-03-29 15:00:11
这是迄今为止我见过的最好的用于限制同时请求的库。和很好的例子,谢谢!
2021-03-31 15:00:11

请注意,Promise.all()这不会触发Promise开始他们的工作,而是创建Promise本身。

考虑到这一点,一种解决方案是在Promise解决时检查是否应该启动新Promise,或者您是否已经达到限制。

然而,这里真的没有必要重新发明轮子。可以用于此目的的一个库是es6-promise-pool. 从他们的例子:

// On the Web, leave out this line and use the script tag above instead. 
var PromisePool = require('es6-promise-pool')

var promiseProducer = function () {
  // Your code goes here. 
  // If there is work left to be done, return the next work item as a promise. 
  // Otherwise, return null to indicate that all promises have been created. 
  // Scroll down for an example. 
}

// The number of promises to process simultaneously. 
var concurrency = 3

// Create a pool. 
var pool = new PromisePool(promiseProducer, concurrency)

// Start the pool. 
var poolPromise = pool.start()

// Wait for the pool to settle. 
poolPromise.then(function () {
  console.log('All promises fulfilled')
}, function (error) {
  console.log('Some promise rejected: ' + error.message)
})
我还发现 p-limit 是最简单的实现。请参阅下面的示例。stackoverflow.com/a/52262024/8177355
2021-03-17 15:00:11
我认为 tiny-asyc-pool 是用于限制 promise 并发性的更好、非侵入性且相当自然的解决方案。
2021-04-05 15:00:11
异步池本质上是不同的,它使用一个Promise数组而不是一个返回Promise或 null 的函数来完成
2021-04-06 15:00:11
看看两者,异步池看起来更好!更直接,更轻巧。
2021-04-10 15:00:11
不幸的是,es6-promise-pool 重新发明了 Promise 而不是使用它们。我建议使用这个简洁的解决方案(如果你已经在使用 ES6 或 ES7)github.com/rxaviers/async-pool
2021-04-13 15:00:11

使用 Array.prototype.splice

while (funcs.length) {
  // 100 at a time
  await Promise.all( funcs.splice(0, 100).map(f => f()) )
}
在缺乏更多上下文的情况下花了一秒钟时间来了解它正在做什么,例如它是一个批次而不是一个池。每次从头或中间拼接时,都会对数组进行重新排序。(浏览器必须重新索引所有项目)理论上性能更好的替代方法是从最后取东西,arr.splice(-100)如果顺序不数学,也许你可以反转数组:P
2021-03-15 15:00:11
对于批量运行非常有用。注意:直到当前批次 100% 完成后,才会开始下一个批次。
2021-03-23 15:00:11
这将批量运行函数而不是池,其中一个函数在另一个函数完成时立即调用。
2021-04-04 15:00:11
这是一个被低估的解决方案。喜欢简单。
2021-04-06 15:00:11
喜欢这个解决方案!
2021-04-13 15:00:11

如果您知道迭代器的工作原理以及它们的使用方式,您就不需要任何额外的库,因为自己构建自己的并发会变得非常容易。我来演示一下:

/* [Symbol.iterator]() is equivalent to .values()
const iterator = [1,2,3][Symbol.iterator]() */
const iterator = [1,2,3].values()


// loop over all items with for..of
for (const x of iterator) {
  console.log('x:', x)
  
  // notices how this loop continues the same iterator
  // and consumes the rest of the iterator, making the
  // outer loop not logging any more x's
  for (const y of iterator) {
    console.log('y:', y)
  }
}

我们可以使用相同的迭代器并在工作人员之间共享它。

如果你使用.entries()而不是.values()你会得到一个二维数组[[index, value]],我将在下面用 2 的并发性来演示

const sleep = t => new Promise(rs => setTimeout(rs, t))

async function doWork(iterator) {
  for (let [index, item] of iterator) {
    await sleep(1000)
    console.log(index + ': ' + item)
  }
}

const iterator = Array.from('abcdefghij').entries()
const workers = new Array(2).fill(iterator).map(doWork)
//    ^--- starts two workers sharing the same iterator

Promise.allSettled(workers).then(() => console.log('done'))

这样做的好处是您可以拥有一个生成器功能,而不是立即准备好所有东西。

更棒的是你可以stream.Readable.from(iterator)在 node 中(最终也可以在 whatwg 流中)。并且使用可转移的 ReadbleStream,如果您正在与网络工作者一起工作也为了表演,这使得这个潜力在该功能中非常有用


注意:与示例异步池相比,与此不同的是它产生了两个工作人员,因此如果一个工作人员出于某种原因在索引 5 处抛出错误,则不会阻止其他工作人员完成其余工作。所以你从 2 个并发减少到 1 个。(所以它不会停在那里)所以我的建议是你捕获doWork函数内的所有错误

它绝对应该作为 NPM module发布,我想使用它。
2021-03-15 15:00:11
这太棒了!感谢无尽!
2021-03-18 15:00:11
稍后可能会更酷的是当 Streams 获得Readable.from(iterator)支持时。Chrome 已经使流可以传输所以你可以创建可读的流并将其发送给网络工作者,他们最终都会使用相同的底层迭代器。
2021-03-21 15:00:11
这绝对是一个很酷的方法!只需确保您的并发性不超过任务列表的长度(如果您仍然关心结果),因为您最终可能会得到额外的结果!
2021-04-06 15:00:11
@KrisOye 当并发超过任务列表的长度时,我不会重现问题。我只是尝试用 20 个工人 ( new Array(20))运行代码片段它按要求运行完成,没有额外的。(额外的工作人员立即完成,因为迭代器在启动之前就完成了。)
2021-04-10 15:00:11

不要使用 promise 来限制 http 请求,而是使用节点的内置http.Agent.maxSockets这消除了使用库或编写自己的池代码的要求,并且具有额外的优势,可以更好地控制您的限制。

agent.maxSockets

默认设置为无穷大。确定代理可以为每个源打开多少个并发套接字。Origin 是“host:port”或“host:port:localAddress”的组合。

例如:

var http = require('http');
var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin
var request = http.request({..., agent: agent}, ...);

如果向同一来源发出多个请求,设置keepAlive为 true也可能对您有益(有关更多信息,请参阅上面的文档)。

尽管如此,立即创建数千个闭包和池化套接字似乎不是很有效?
2021-04-06 15:00:11