Javascript - 如何控制并行访问网络的Promise数量

IT技术 javascript promise
2021-03-11 21:50:20

在我的应用程序中,我有一系列并行访问网络的Promise,但有时,当我的应用程序全速运行时,我的网络速度变慢,由于许多Promise访问网络,我想知道如何控制许多接入网络并行。这是代码示例:

var ids = {1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 6: 56, 7: 7, 8: 8, 5:6 }; // this is random 
Promise.all( Object.keys(ids).map(function(dp){
  return new Promise(function(resolve, reject){

       http.post({url: addr, form: { data: dp }}, function(err, res, body){
        if (err){
            reject(err)
        }            
        resolve(body.xx);
      });

  });
})).then(function(data){
       http.post({url: hostAddress, form: { data: data.x }}, function(err, res, body){
        ......
        resolve(body.xx);
      });    
});
  });
}))

有很多网络。我会很棒,我只能同时允许 2 或 3 个。谢谢你的帮助。

2个回答

您可以使用具有并发选项的Bluebird.map()来控制同时进行的请求数:

const Promise = require('bluebird');
const http = Promise.promisifyAll(require('http');

var ids = {1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 6: 56, 7: 7, 8: 8, 5:6 }; // this is random 

Promise.map(Object.keys(ids).map(function(dp){
    return  http.post({url: addr, form: { data: dp }).then(function(body) {
        return body.xx;
    });
}), {concurrency: 2}).then(function(results) {
    // process results here
});

仅供参考,我不明白你想用你的第二个做什么,http.post()因为你引用的data.x时间data是一个数组。我认为代码有点伪代码太多,无法说明您真正想用第二个http.post().


否则,您可以编写自己的并发控制代码,首先启动 N 个请求,然后每次完成一个请求,然后启动另一个,直到您无事可做。这是手动编码并发控制的示例:

一次触发 1,000,000 个请求 100 个

或者,您可以像这样自己编写:

const http = require('http');

function httpPost(options) {
    return new Promise(function(resolve, reject) {
        http.post(options, function(err, res, body) {
            if (err) {
                reject(err);
            } else {
                resolve(body);
            }
        });
    });
}

// takes an array of items and a function that returns a promise
function mapConcurrent(items, maxConcurrent, fn) {
    let index = 0;
    let inFlightCntr = 0;
    let doneCntr = 0;
    let results = new Array(items.length);
    let stop = false;

    return new Promise(function(resolve, reject) {

        function runNext() {
            let i = index;
            ++inFlightCntr;
            fn(items[index], index++).then(function(val) {
                ++doneCntr;
                --inFlightCntr;
                results[i] = val;
                run();
            }, function(err) {
                // set flag so we don't launch any more requests
                stop = true;
                reject(err);
            });
        }

        function run() {
            // launch as many as we're allowed to
            while (!stop && inflightCntr < maxConcurrent && index < items.length) {
                runNext();
            }
            // if all are done, then resolve parent promise with results
            if (doneCntr === items.length) {
                resolve(results);
            }
        }

        run();
    });
}

var ids = {1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 6: 56, 7: 7, 8: 8, 5:6 }; // this is random 


mapConcurrent(Object.keys(ids), 2, function(item, index) {
    return httpPost({url: addr, form: {data: item}}).then(function(body) {
        return body.xxx;
    });
}).then(function(results) {
    // array of results here
}, function(err) {
    // error here    
});
@dmx - 当然,Bluebird 应该与几乎所有其他不尝试替换Promise对象的包兼容
2021-04-16 21:50:20
@ jfriend00 基本上我有一个 id 列表,我请求获取用户信息,最后请求获取用户朋友
2021-04-19 21:50:20
@dmx - 仅供参考,我添加了一个不依赖于 Bluebird Promise库的手动编码版本。如果您喜欢 Bluebird,我建议您使用它,但我认为看看如何自己编写代码会很有指导意义。
2021-04-22 21:50:20
@dmx - 但是,由于伪代码不准确,我无法判断您要向其中输入什么。您可以使用我提供的代码中的结果发出新请求,其中显示“在此处处理结果”。
2021-05-13 21:50:20
在第二篇文章中,我提出了新的请求
2021-05-15 21:50:20

这是在不使用库的情况下实现目标的一种方法。在从 makeMaxConcurrencyRequests() 返回的Promise中,startNew() 函数被递归调用,发送新的请求,直到我们通过每个 id,并且不超过当前请求的 maxConcurrency 计数。

当每个请求完成时,它的返回数据被推送到返回数据数组中。当所有请求都完成后,promise 将通过返回数据解析。

我还没有测试过这个,但看着它我唯一担心的是 startNew() 将在请求挂起时快速连续多次调用。如果这会导致问题,而不是立即调用 startNew(),我们可以使用 setTimeout 来延迟下一次调用——这在我的代码中被注释掉了。

function makeMaxConcurrencyRequests(ids, maxConcurrency) {
    return new Promise(function(resolve, reject) {
        let i = 0, currentlyRunning = 0, returnedData = [];
        function startNew() {        
            while (i < ids.length && currentlyRunning <= maxConcurrency) {
                makeRequest(ids[i++]).then(function(data) {
                    returnedData.push(data);
                    currentlyRunning--;
                    startNew();
                }).catch(function(err) {
                    reject(err);
                });
                currentlyRunning++;
            }
            if (i >= ids.length && currentlyRunning === 0) {
                resolve(returnedData);
            }
            startNew();
            // setTimeout(startNew, 200);           
        }
    }
}

function makeRequest(id) {
    return new Promise(function(resolve, reject){
        http.post({url: addr, form: { data: dp }}, function(err, res, body){
            if (err){
                reject(err)
            } 

            http.post({url: hostAddress, form: { data: body.xx }}, function(err2, res2, body2){
                if(err2) {
                    reject(err2);
                }
                resolve(body2.xx);
           }); 
       });

   });
}

用法:

var ids = {1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 6: 56, 7: 7, 8: 8, 5:6 };
var maxConcurrency = 3;
makeMaxConcurrencyRequests(Object.keys(ids), maxConcurrency)
.then(function(data) {
    // do something with data
}).catch(function(error) {
    // do something with error
});