限制正在运行的Promise的并发性

IT技术 javascript node.js promise bluebird
2021-01-14 16:00:58

我正在寻找一个Promise函数包装器,它可以在给定的Promise运行时限制/节流,以便在给定的时间只运行一定数量的Promise。

在下面的情况下delayPromise,永远不应该同时运行,它们都应该按照先到先得的顺序一次运行一个。

import Promise from 'bluebird'

function _delayPromise (seconds, str) {
  console.log(str)
  return Promise.delay(seconds)
}

let delayPromise = limitConcurrency(_delayPromise, 1)

async function a() {
  await delayPromise(100, "a:a")
  await delayPromise(100, "a:b")
  await delayPromise(100, "a:c")
}

async function b() {
  await delayPromise(100, "b:a")
  await delayPromise(100, "b:b")
  await delayPromise(100, "b:c")
}

a().then(() => console.log('done'))

b().then(() => console.log('done'))

关于如何设置这样的队列的任何想法?

我有一个来自精彩的“去抖动”功能Benjamin Gruenbaum我需要修改它以根据它自己的执行而不是延迟来限制Promise。

export function promiseDebounce (fn, delay, count) {
  let working = 0
  let queue = []
  function work () {
    if ((queue.length === 0) || (working === count)) return
    working++
    Promise.delay(delay).tap(function () { working-- }).then(work)
    var next = queue.shift()
    next[2](fn.apply(next[0], next[1]))
  }
  return function debounced () {
    var args = arguments
    return new Promise(function (resolve) {
      queue.push([this, args, resolve])
      if (working < count) work()
    }.bind(this))
  }
}
5个回答

我不认为有任何库可以做到这一点,但实现自己实际上很简单:

function queue(fn) { // limitConcurrency(fn, 1)
    var q = Promise.resolve();
    return function(x) {
        var p = q.then(function() {
            return fn(x);
        });
        q = p.reflect();
        return p;
    };
}

对于多个并发请求,它变得有点棘手,但也可以完成。

function limitConcurrency(fn, n) {
    if (n == 1) return queue(fn); // optimisation
    var q = null;
    var active = [];
    function next(x) {
        return function() {
            var p = fn(x)
            active.push(p.reflect().then(function() {
                active.splice(active.indexOf(p), 1);
            })
            return [Promise.race(active), p];
        }
    }
    function fst(t) {
        return t[0];
    }
    function snd(t) {
        return t[1];
    }
    return function(x) {
        var put = next(x)
        if (active.length < n) {
            var r = put()
            q = fst(t);
            return snd(t);
        } else {
            var r = q.then(put);
            q = r.then(fst);
            return r.then(snd)
        }
    };
}

顺便说一句,您可能想看看演员模型CSP他们可以简化处理这些事情,也有一些 JS 库可供他们使用。

例子

import Promise from 'bluebird'

function sequential(fn) {
  var q = Promise.resolve();
  return (...args) => {
    const p = q.then(() => fn(...args))
    q = p.reflect()
    return p
  }
}

async function _delayPromise (seconds, str) {
  console.log(`${str} started`)
  await Promise.delay(seconds)
  console.log(`${str} ended`)
  return str
}

let delayPromise = sequential(_delayPromise)

async function a() {
  await delayPromise(100, "a:a")
  await delayPromise(200, "a:b")
  await delayPromise(300, "a:c")
}

async function b() {
  await delayPromise(400, "b:a")
  await delayPromise(500, "b:b")
  await delayPromise(600, "b:c")
}

a().then(() => console.log('done'))
b().then(() => console.log('done'))

// --> with sequential()

// $ babel-node test/t.js
// a:a started
// a:a ended
// b:a started
// b:a ended
// a:b started
// a:b ended
// b:b started
// b:b ended
// a:c started
// a:c ended
// b:c started
// done
// b:c ended
// done

// --> without calling sequential()

// $ babel-node test/t.js
// a:a started
// b:a started
// a:a ended
// a:b started
// a:b ended
// a:c started
// b:a ended
// b:b started
// a:c ended
// done
// b:b ended
// b:c started
// b:c ended
// done
@ThomasReggi:所以这个例子有效,你还有什么想要回答的吗?
2021-03-14 16:00:58
嗯,有人在没有解释他们的推理的情况下疯狂投票。
2021-04-11 16:00:58

使用 throttled-promise module:

https://www.npmjs.com/package/throttled-promise

var ThrottledPromise = require('throttled-promise'),
    promises = [
        new ThrottledPromise(function(resolve, reject) { ... }),
        new ThrottledPromise(function(resolve, reject) { ... }),
        new ThrottledPromise(function(resolve, reject) { ... })
    ];

// Run promises, but only 2 parallel
ThrottledPromise.all(promises, 2)
.then( ... )
.catch( ... );
他正在寻找一个Promise返回函数的包装器,而不是一个包装好的Promise构造函数。此外,该库似乎要求您使用ThrottledPromise.all,这不符合问题中所述的问题。请尝试实现a/b示例,您可能会看到它如何不起作用 - 或者如果它起作用,该代码实际上回答了问题。请注意,SO 不是图书馆推荐服务,而是需要实际回答问题,这确实涉及代码编写。
2021-03-20 16:00:58
我不知道如何用这个库解决 OP 的问题,它在不同的范围内创建Promise。你能实现ab功能limitConcurrency吗?
2021-03-24 16:00:58
这是一个流行的库,从字面上看,这是 op 的问题:“我正在寻找一个 Promise 函数包装器,它可以在给定的 Promise 运行时限制/节流,以便在给定的时间只运行一定数量的该 Promise。 ” 如果你试图限制Promise,你应该使用限制Promise。SO 用于回答问题,而不是按需编写代码。
2021-04-07 16:00:58

我也有同样的问题。我写了一个库来实现它。代码在这里我创建了一个队列来保存所有的Promise。当您将一些Promise推送到队列时,队列头部的前几个Promise将被弹出并运行。一旦一个Promise完成,队列中的下一个Promise也将被弹出并运行。一次又一次,直到队列中没有Task. 您可以查看代码以了解详细信息。希望这个图书馆能帮到你。

好处

  • 您可以定义并发Promise的数量(几乎同时请求)
  • 一致的流程:一旦一个Promise解决,另一个请求开始,无需猜测服务器能力
  • 抗数据阻塞,如果服务器停止片刻,它只会等待,并且不会因为时钟允许而启动下一个任务
  • 不要依赖第三方module,它是 Vanila node.js

第一件事是让 https 成为一个Promise,所以我们可以使用等待来检索数据(从示例中删除)第二创建一个Promise调度程序,在任何Promise得到解决时提交另一个请求。3 拨打电话

通过限制并发Promise的数量来限制请求

const https = require('https')

function httpRequest(method, path, body = null) {
  const reqOpt = { 
    method: method,
    path: path,
    hostname: 'dbase.ez-mn.net', 
    headers: {
      "Content-Type": "application/json",
      "Cache-Control": "no-cache"
    }
  }
  if (method == 'GET') reqOpt.path = path + '&max=20000'
  if (body) reqOpt.headers['Content-Length'] = Buffer.byteLength(body);
  return new Promise((resolve, reject) => {
  const clientRequest = https.request(reqOpt, incomingMessage => {
      let response = {
          statusCode: incomingMessage.statusCode,
          headers: incomingMessage.headers,
          body: []
      };
      let chunks = ""
      incomingMessage.on('data', chunk => { chunks += chunk; });
      incomingMessage.on('end', () => {
          if (chunks) {
              try {
                  response.body = JSON.parse(chunks);
              } catch (error) {
                  reject(error)
              }
          }
          console.log(response)
          resolve(response);
      });
  });
  clientRequest.on('error', error => { reject(error); });
  if (body) { clientRequest.write(body)  }  
  clientRequest.end();

  });
}

    const asyncLimit = (fn, n) => {
      const pendingPromises = new Set();

  return async function(...args) {
    while (pendingPromises.size >= n) {
      await Promise.race(pendingPromises);
    }

    const p = fn.apply(this, args);
    const r = p.catch(() => {});
    pendingPromises.add(r);
    await r;
    pendingPromises.delete(r);
    return p;
  };
};
// httpRequest is the function that we want to rate the amount of requests
// in this case, we set 8 requests running while not blocking other tasks (concurrency)


let ratedhttpRequest = asyncLimit(httpRequest, 8);

// this is our datase and caller    
let process = async () => {
  patchData=[
      {path: '/rest/slots/80973975078587', body:{score:3}},
      {path: '/rest/slots/809739750DFA95', body:{score:5}},
      {path: '/rest/slots/AE0973750DFA96', body:{score:5}}]

  for (let i = 0; i < patchData.length; i++) {
    ratedhttpRequest('PATCH', patchData[i].path,  patchData[i].body)
  }
  console.log('completed')
}

process() 

串行运行异步进程的经典方法是使用async.js和使用async.series(). 如果您更喜欢基于Promise的代码,那么有一个Promise版本async.jsasync-q

有了async-q你可以再次使用series

async.series([
    function(){return delayPromise(100, "a:a")},
    function(){return delayPromise(100, "a:b")},
    function(){return delayPromise(100, "a:c")}
])
.then(function(){
    console.log(done);
});

同时运行它们中的两个将同时运行ab但在每个内部它们将是顺序的:

// these two will run concurrently but each will run
// their array of functions sequentially:
async.series(a_array).then(()=>console.log('a done'));
async.series(b_array).then(()=>console.log('b done'));

如果您想在b之后运行a请将其放入.then()

async.series(a_array)
.then(()=>{
    console.log('a done');
    return async.series(b_array);
})
.then(()=>{
    console.log('b done');
});

如果您不想按顺序运行每个,而是希望限制每个同时运行一定数量的进程,那么您可以使用parallelLimit()

// Run two promises at a time:
async.parallelLimit(a_array,2)
.then(()=>console.log('done'));

阅读 async-q 文档:https : //github.com/dbushong/async-q/blob/master/READJSME.md

@Bergi:这就是我展示的。使用async.series()将按delayPromise顺序运行
2021-03-17 16:00:58
不。即使您同时调用a调用它们,它们也应该是连续的b
2021-03-26 16:00:58
OP 不想知道如何在没有 async 关键字的情况下实现a/ b,而是想知道如何实现limitConcurrency以便在示例中delayPromise来自a调用b不是并行的。
2021-04-11 16:00:58