向一个每分钟只能处理 20 个请求的 API 发出多个请求

IT技术 javascript node.js
2021-01-28 13:14:45

我有一个返回Promise的方法,并且该方法在内部调用一个 API,该 API 每分钟只能有 20 个请求。问题是我有一大堆对象(大约 300 个),我想为每个对象调用 API。

目前我有以下代码:

    const bigArray = [.....];

    Promise.all(bigArray.map(apiFetch)).then((data) => {
      ...
    });

但它不处理时序约束。我希望我可以使用 _.chunk 和 _.debounce from 之类的东西,lodash但我无法理解它。有人可以帮我吗?

2个回答

如果您可以使用 Bluebird Promise库,它具有内置的并发功能,可让您管理一组异步操作,一次最多 N 个。

var Promise = require('bluebird');
const bigArray = [....];

Promise.map(bigArray, apiFetch, {concurrency: 20}).then(function(data) {
    // all done here
});

这个接口的好处是它可以保持 20 个请求。它将启动 20,然后每完成一个,它就会启动另一个。因此,这可能比发送 20 个更有效,等待所有完成,再发送 20 个等等......

这也以完全相同的顺序提供结果,bigArray以便您可以确定哪个结果与哪个请求相符。

当然,您可以使用计数器通过通用Promise自己编写代码,但由于它已经内置在 Bluebird 库中,我想我会推荐这种方式。

Async 库也有类似的并发控制,尽管它显然不是基于Promise的。


这是一个仅使用 ES6 promise 的手工编码版本,它维护结果顺序并始终保持 20 个请求在运行(直到没有 20 个请求)以获得最大吞吐量:

function pMap(array, fn, limit) {
    return new Promise(function(resolve, reject) {
        var index = 0, cnt = 0, stop = false, results = new Array(array.length);

        function run() {
            while (!stop && index < array.length && cnt < limit) {
                (function(i) {
                    ++cnt;
                    ++index;
                    fn(array[i]).then(function(data) {
                        results[i] = data;
                        --cnt;
                        // see if we are done or should run more requests
                        if (cnt === 0 && index === array.length) {
                            resolve(results);
                        } else {
                            run();
                        }
                    }, function(err) {
                        // set stop flag so no more requests will be sent
                        stop = true;
                        --cnt;
                        reject(err);
                    });
                })(index);
            }
        }
        run();
    });
}   

pMap(bigArray, apiFetch, 20).then(function(data) {
    // all done here
}, function(err) {
    // error here
});

这里的工作演示:http : //jsfiddle.net/jfriend00/v98735uu/

@IgnacioARivas - 添加了手动编码版本,可维护结果顺序并始终保持 20 个请求,并且不使用外部库。
2021-03-28 13:14:45

您可以每分钟发送 1 个包含 20 个请求的块,或者每 3 秒将它们隔开 1 个请求(后者可能是 API 所有者的首选)。

function rateLimitedRequests(array, chunkSize) {
  var delay = 3000 * chunkSize;
  var remaining = array.length;
  var promises = [];
  var addPromises = function(newPromises) {
    Array.prototype.push.apply(promises, newPromises);
    if (remaining -= newPromises.length == 0) {
      Promise.all(promises).then((data) => {
        ... // do your thing
      });
    }
  };
  (function request() {
    addPromises(array.splice(0, chunkSize).map(apiFetch));
    if (array.length) {
      setTimeout(request, delay);
    }
  })();
}

每 3 秒调用 1:

rateLimitedRequests(bigArray, 1);

或每分钟 20 次:

rateLimitedRequests(bigArray, 20);

如果您更喜欢使用_.chunkand 1_.debounce _.throttle

function rateLimitedRequests(array, chunkSize) {
  var delay = 3000 * chunkSize;
  var remaining = array.length;
  var promises = [];
  var addPromises = function(newPromises) {
    Array.prototype.push.apply(promises, newPromises);
    if (remaining -= newPromises.length == 0) {
      Promise.all(promises).then((data) => {
        ... // do your thing
      });
    }
  };
  var chunks = _.chunk(array, chunkSize);  
  var throttledFn = _.throttle(function() {
    addPromises(chunks.pop().map(apiFetch));
  }, delay, {leading: true});
  for (var i = 0; i < chunks.length; i++) {
    throttledFn();
  }
}

1您可能想要,_.throttle因为它会在延迟后执行每个函数调用,而_.debounce将多个调用组合为一个调用。请参阅文档链接的这篇文章

Debounce:将其视为“将多个事件组合为一个”。想象一下,你回家,进入电梯,门正在关上……突然你的邻居出现在大厅里并试图跳上电梯。讲礼貌!并为他打开门:你正在阻止电梯离开。考虑到同样的情况可能会再次发生在第三人身上,依此类推……可能会延迟出发几分钟。

Throttle:把它想象成一个阀门,它调节执行的流程。我们可以确定某个函数在特定时间内可以调用的最大次数。所以在电梯的比喻中..你有足够的礼貌让人们进来 10 秒,但是一旦延迟过去,你必须走!

太棒了,这就是我所追求的。谢了哥们。
2021-03-22 13:14:45
我只是更新了一些额外的代码,以便您的Promise正常工作,假设您需要一次处理所有数据。否则旧代码仍然能够处理独立进来的数据。
2021-03-23 13:14:45
你为什么使用Array.prototype.push.apply(data, newData)而不是data.push(newData)
2021-03-30 13:14:45
注意:如果这很重要,我认为这不会保留数据的顺序。它似乎在数据到达时将数据推送到输出数组中,而不考虑原始请求的顺序。
2021-04-02 13:14:45
你是对的,如果请求是异步的并且碰巧以不同的顺序返回,它不会保留顺序。
2021-04-10 13:14:45