将 setInterval() 与数组中的 .map 方法一起使用并作为Promise返回

IT技术 javascript arrays api promise timer
2021-03-21 10:13:45

我有一个函数,它接受一个 ID 列表,将它们转换成一个数组 URL,然后使用 map 函数来触发一个获取请求。它工作得很好,但它触发得太快,并且提供程序抛出错误,因为我们太频繁地访问 API。我需要为请求设置一个间隔,但每次我这样做时,它都不起作用。想法?

async function getReports(reportIDs) {
    const urls = reportIDs.map(id => `https://api.data.com/api/v1/report/${id}/?include_datasets=true`);
    const requests = urls.map(url => fetch(url, {
        method: 'GET',
        headers: { 'api-key': key }
    }).then(res => res.json()));
    
    const responses = await Promise.all(requests).catch(err => console.error(err));
    return responses;
}

我使用了一个 promise,所以我可以在另一个函数中等待函数的结果来转换数据集。

想法?

4个回答

“简单是一种伟大的美德,但它需要努力工作才能实现,需要接受教育才能欣赏它。更糟糕的是:复杂性卖得更好。” Edsger W. Dijkstra

公认的“轻量级”解决方案将近20,000行代码,并且依赖于 CoffeeScript 和 Lua。如果你可以用 50 行 JavaScript 来交换所有这些呢?

假设我们有一些job需要一些时间来计算一些结果 -

async function job(x) {
  // job consumes some time
  await sleep(rand(5000))
  // job computes a result
  return x * 10
}

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(job))
  .then(console.log, console.error)
[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]

这将一次运行所有十二 (12) 个作业。如果这些是对远程的请求,则某些连接可能会被拒绝,因为您同时使用过多的流量淹没了服务器。通过Pool对线程建模,我们控制并行化作业的流程 -

// my pool with four threads
const pool = new Pool(4)

async function jobQueued(x) {
  // wait for pool thread
  const close = await pool.open()
  // run the job and close the thread upon completion
  return job(x).then(close)
}

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(jobQueued))
  .then(console.log, console.error)
[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]

函数应该很小并且只做一件事。这使得编写单个功能变得更容易,并提高了更高程度的可重用性,允许您将几个简单的功能组合成更复杂的功能。上面你已经看到randsleep-

const rand = x =>
  Math.random() * x

const sleep = ms =>
  new Promise(r => setTimeout(r, ms))

如果我们想要throttle每个工作,我们可以专门sleep确保最小运行时间 -

const throttle = (p, ms) =>
  Promise.all([ p, sleep(ms) ]).then(([ value, _ ]) => value)

async function jobQueued(x) {
  const close = await pool.open()
  // ensure job takes at least 3 seconds before freeing thread
  return throttle(job(x), 3000).then(close)
}

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(jobQueued))
  .then(console.log, console.error)
[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]

我们可以添加一些console.log消息以确保一切正常运行。并且我们会sleep在job的开头添加一个random来表示任务可以按任意顺序排队而不影响结果的顺序——

async function jobQueued(x) {
  await sleep(rand(5000))
  console.log("queueing", x)
  const close = await pool.open()
  console.log("  sending", x)
  const result = await throttle(job(x), 3000).then(close)
  console.log("    received", result)
  return result
}

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(jobQueued))
  .then(console.log, console.error)
控制台日志 线程 1 线程 2 线程 3 线程 4
排队12
   发送 12 打开
排队9
   发送 9 打开
排队8
   发送 8 打开
排队4
   发送 4 打开
排队10
排队6
排队7
排队2
排队 11
      收到 120 关闭
   发送 11 打开
排队3
排队5
排队 1
      收到 80 关闭
   发送 1 打开
      收到 90 关闭
   发送 5 打开
      收到 110 关闭
   发送 3 打开
      收到 40 关闭
   发送 2 打开
      收到 10 关闭
   发送 7 打开
      收到 50 关闭
   发送 6 打开
      收到 20 关闭
   发送 10 打开
      收到 30 关闭
      收到 70 关闭
      收到 60 关闭
      收到 100 关闭
[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]

上面,我们pool被初始化,size=4因此最多四个作业将同时运行。在我们看到sending四次之后,必须完成一项工作,我们received才能在下一项工作开始之前看到queueing随时可能发生。您可能还会注意到Pool使用高效的后进先出 (LIFO) 顺序处理排队的作业,但结果的顺序会保持不变。

继续我们的实现,就像我们的其他函数一样,我们可以用thread一种简单的方式编写——

const effect = f => x =>
  (f(x), x)

const thread = close =>
  [new Promise(r => { close = effect(r) }), close]

function main () {
  const [t, close] = thread()
  console.log("please wait...")
  setTimeout(close, 3000)
  return t.then(_ => "some result")
}

main().then(console.log, console.error)

please wait...
(3 seconds later)
some result

现在我们可以thread用来编写更复杂的功能,例如Pool-

class Pool {
  constructor (size = 4) {
    Object.assign(this, { pool: new Set, stack: [], size })
  }
  open () {
    return this.pool.size < this.size
      ? this.deferNow()
      : this.deferStacked()
  }
  deferNow () {
    const [t, close] = thread()
    const p = t
      .then(_ => this.pool.delete(p))
      .then(_ => this.stack.length && this.stack.pop().close())
    this.pool.add(p)
    return close
  }
  deferStacked () {
    const [t, close] = thread()
    this.stack.push({ close })
    return t.then(_ => this.deferNow())
  }
}

就这样你的程序就完成了。在下面的功能演示中,我压缩了定义,以便我们可以一次看到它们。运行程序以在您自己的浏览器中验证结果 -

class Pool {
  constructor (size = 4) { Object.assign(this, { pool: new Set, stack: [], size }) }
  open () { return this.pool.size < this.size ? this.deferNow() : this.deferStacked() }
  deferNow () { const [t, close] = thread(); const p = t.then(_ => this.pool.delete(p)).then(_ => this.stack.length && this.stack.pop().close()); this.pool.add(p); return close }
  deferStacked () { const [t, close] = thread(); this.stack.push({ close }); return t.then(_ => this.deferNow()) }
}
const rand = x => Math.random() * x
const effect = f => x => (f(x), x)
const thread = close => [new Promise(r => { close = effect(r) }), close]
const sleep = ms => new Promise(r => setTimeout(r, ms))
const throttle = (p, ms) => Promise.all([ p, sleep(ms) ]).then(([ value, _ ]) => value)

const myJob = x => sleep(rand(5000)).then(_ => x * 10)
const pool = new Pool(4)

async function jobQueued(x) {
  await sleep(rand(5000))
  console.log("queueing", x)
  const close = await pool.open()
  console.log("  sending", x)
  const result = await throttle(myJob(x), 3000).then(close)
  console.log("    received", result)
  return result
}

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(jobQueued))
  .then(JSON.stringify)
  .then(console.log, console.error)
.as-console-wrapper { min-height: 100%; }

希望你学到了一些关于 JavaScript 的乐趣!如果您喜欢这个,请尝试扩展Pool功能。也许添加一个简单的timeout功能来确保工作在一定时间内完成。或者可以添加一个retry函数,如果它产生错误或超时,则重新运行作业。要查看Pool应用于另一个问题,请参阅此问答如果您有任何问题,我很乐意为您提供帮助 :D

首先修改您的代码,使其调用名为waitAndFetch(). 您将传入 URL 加上数组中该元素的索引值 (0,1,2,3...):

async function getReports(reportIDs) {
    const urls = reportIDs.map(id => `https://api.data.com/api/v1/report/${id}/?include_datasets=true`);
    
    const requests = urls.map((url,i) => waitAndFetch);
    
    const responses = await Promise.all(requests).catch(err => console.error(err));
    return responses;
}

接下来,创建一个函数,它只返回一个Promise,该Promise在 1000 x 索引值毫秒(0、1 秒、2 秒、3 秒...)内解析:

const wait = (i) => {
    const ms = i * 1000;
    return new Promise((resolve, reject) => {
        setTimeout(resolve, ms);
    });
};

现在写waitAndFetch。它会调用wait(),但不会关心它的返回值。它只关心它必须等待它解决:

const waitAndFetch = async (url, i) => {
    await wait(i);
    const response = fetch(url, {
        method: 'GET',
        headers: { 'api-key': key }
    });
    return response.json();
};
这使得每个请求waitfetching前一秒,requests仍然同时开始。如果您想在每个请求前等待一秒钟,您可以使用一个简单的for循环。就像是const result = []; for (const url of urls) { await wait(); result.push(await fetch(url)) } return result
2021-04-24 10:13:45
谢谢谢谢。我通过修复代码编写了一个修复程序
2021-04-25 10:13:45

如果是节点,则有一个相当轻量级的 npm 包,称为bottleneck,它将限制请求。它工作得很好并且被广泛使用。或者你可以看看那个并推出你自己的。

欢迎使用 StackOverflow。除非还包含框架或库的标记,否则纯 JavaScript 答案是期望的简单推荐图书馆的帖子可以写成评论。如果您对如何自己解决此类问题感兴趣,请参阅我提供的答案。
2021-04-23 10:13:45
@感谢您花时间为未来的读者提供纯 js 解决方案。你的回答很好;由于没有时间说明,我立即做出回应,解决了问题(并指出了一个例子),让 OP 继续讨论重要的事情。我同意,尤其是在不编写一次性实用程序代码时,几行代码比使用整个库更可取。我也同意错误处理、超时和重试将是许多应用程序的关键功能。有时教书很重要,有时需要指出并继续前进;值得庆幸的是,Stack 为两者提供了便利。
2021-04-23 10:13:45
工作得很好` async function getReports(reportIDs) { const urls = reportIDs.map(id => ' api.data.com/api/v1/report/$ {id}/?include_datasets=true'); const requests = urls.map(url => limiter.schedule(()=>fetch(url, { method: 'GET', headers: { 'api-key': key } }).then(res => res. json()))); `
2021-05-08 10:13:45
谢谢丹尼尔。我同意你的观点,我只是建议你的帖子是作为评论发表的:D
2021-05-20 10:13:45

我认为最好的办法是使用睡眠功能。

async function sleep() {
  return new Promise((resolve) => setTimeout(resolve, 300));
}

await sleep()
如果 API 在 400 毫秒内没有返回怎么办?10000?60000?
2021-04-22 10:13:45
不管关键是你在每次通话之间等待而不是等到它回来
2021-04-27 10:13:45