“简单是一种伟大的美德,但它需要努力工作才能实现,需要接受教育才能欣赏它。更糟糕的是:复杂性卖得更好。” — Edsger W. Dijkstra
公认的“轻量级”解决方案将近20,000行代码,并且依赖于 CoffeeScript 和 Lua。如果你可以用 50 行 JavaScript 来交换所有这些呢?
假设我们有一些job需要一些时间来计算一些结果 -
async function job(x) {
// job consumes some time
await sleep(rand(5000))
// job computes a result
return x * 10
}
Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(job))
.then(console.log, console.error)
[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]
这将一次运行所有十二 (12) 个作业。如果这些是对远程的请求,则某些连接可能会被拒绝,因为您同时使用过多的流量淹没了服务器。通过Pool对线程建模,我们控制并行化作业的流程 -
// my pool with four threads
const pool = new Pool(4)
async function jobQueued(x) {
// wait for pool thread
const close = await pool.open()
// run the job and close the thread upon completion
return job(x).then(close)
}
Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(jobQueued))
.then(console.log, console.error)
[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]
函数应该很小并且只做一件事。这使得编写单个功能变得更容易,并提高了更高程度的可重用性,允许您将几个简单的功能组合成更复杂的功能。上面你已经看到rand和sleep-
const rand = x =>
Math.random() * x
const sleep = ms =>
new Promise(r => setTimeout(r, ms))
如果我们想要throttle每个工作,我们可以专门sleep确保最小运行时间 -
const throttle = (p, ms) =>
Promise.all([ p, sleep(ms) ]).then(([ value, _ ]) => value)
async function jobQueued(x) {
const close = await pool.open()
// ensure job takes at least 3 seconds before freeing thread
return throttle(job(x), 3000).then(close)
}
Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(jobQueued))
.then(console.log, console.error)
[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]
我们可以添加一些console.log消息以确保一切正常运行。并且我们会sleep在job的开头添加一个random来表示任务可以按任意顺序排队而不影响结果的顺序——
async function jobQueued(x) {
await sleep(rand(5000))
console.log("queueing", x)
const close = await pool.open()
console.log(" sending", x)
const result = await throttle(job(x), 3000).then(close)
console.log(" received", result)
return result
}
Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(jobQueued))
.then(console.log, console.error)
| 控制台日志 |
线程 1 |
线程 2 |
线程 3 |
线程 4 |
| 排队12 |
⌛ |
⌛ |
⌛ |
⌛ |
| 发送 12 |
打开 |
⌛ |
⌛ |
⌛ |
| 排队9 |
↓ |
⌛ |
⌛ |
⌛ |
| 发送 9 |
↓ |
打开 |
⌛ |
⌛ |
| 排队8 |
↓ |
↓ |
⌛ |
⌛ |
| 发送 8 |
↓ |
↓ |
打开 |
⌛ |
| 排队4 |
↓ |
↓ |
↓ |
⌛ |
| 发送 4 |
↓ |
↓ |
↓ |
打开 |
| 排队10 |
↓ |
↓ |
↓ |
↓ |
| 排队6 |
↓ |
↓ |
↓ |
↓ |
| 排队7 |
↓ |
↓ |
↓ |
↓ |
| 排队2 |
↓ |
↓ |
↓ |
↓ |
| 排队 11 |
↓ |
↓ |
↓ |
↓ |
| 收到 120 |
关闭 |
↓ |
↓ |
↓ |
| 发送 11 |
打开 |
↓ |
↓ |
↓ |
| 排队3 |
↓ |
↓ |
↓ |
↓ |
| 排队5 |
↓ |
↓ |
↓ |
↓ |
| 排队 1 |
↓ |
↓ |
↓ |
↓ |
| 收到 80 |
↓ |
↓ |
关闭 |
↓ |
| 发送 1 |
↓ |
↓ |
打开 |
↓ |
| 收到 90 |
↓ |
关闭 |
↓ |
↓ |
| 发送 5 |
↓ |
打开 |
↓ |
↓ |
| 收到 110 |
关闭 |
↓ |
↓ |
↓ |
| 发送 3 |
打开 |
↓ |
↓ |
↓ |
| 收到 40 |
↓ |
↓ |
↓ |
关闭 |
| 发送 2 |
↓ |
↓ |
↓ |
打开 |
| 收到 10 |
↓ |
↓ |
关闭 |
↓ |
| 发送 7 |
↓ |
↓ |
打开 |
↓ |
| 收到 50 |
↓ |
关闭 |
↓ |
↓ |
| 发送 6 |
↓ |
打开 |
↓ |
↓ |
| 收到 20 |
↓ |
↓ |
↓ |
关闭 |
| 发送 10 |
↓ |
↓ |
↓ |
打开 |
| 收到 30 |
关闭 |
↓ |
↓ |
↓ |
| 收到 70 |
⌛ |
↓ |
关闭 |
↓ |
| 收到 60 |
⌛ |
关闭 |
⌛ |
↓ |
| 收到 100 |
⌛ |
⌛ |
⌛ |
关闭 |
[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]
上面,我们pool被初始化,size=4因此最多四个作业将同时运行。在我们看到sending四次之后,必须完成一项工作,我们received才能在下一项工作开始之前看到。queueing随时可能发生。您可能还会注意到Pool使用高效的后进先出 (LIFO) 顺序处理排队的作业,但结果的顺序会保持不变。
继续我们的实现,就像我们的其他函数一样,我们可以用thread一种简单的方式编写——
const effect = f => x =>
(f(x), x)
const thread = close =>
[new Promise(r => { close = effect(r) }), close]
function main () {
const [t, close] = thread()
console.log("please wait...")
setTimeout(close, 3000)
return t.then(_ => "some result")
}
main().then(console.log, console.error)
please wait...
(3 seconds later)
some result
现在我们可以thread用来编写更复杂的功能,例如Pool-
class Pool {
constructor (size = 4) {
Object.assign(this, { pool: new Set, stack: [], size })
}
open () {
return this.pool.size < this.size
? this.deferNow()
: this.deferStacked()
}
deferNow () {
const [t, close] = thread()
const p = t
.then(_ => this.pool.delete(p))
.then(_ => this.stack.length && this.stack.pop().close())
this.pool.add(p)
return close
}
deferStacked () {
const [t, close] = thread()
this.stack.push({ close })
return t.then(_ => this.deferNow())
}
}
就这样你的程序就完成了。在下面的功能演示中,我压缩了定义,以便我们可以一次看到它们。运行程序以在您自己的浏览器中验证结果 -
class Pool {
constructor (size = 4) { Object.assign(this, { pool: new Set, stack: [], size }) }
open () { return this.pool.size < this.size ? this.deferNow() : this.deferStacked() }
deferNow () { const [t, close] = thread(); const p = t.then(_ => this.pool.delete(p)).then(_ => this.stack.length && this.stack.pop().close()); this.pool.add(p); return close }
deferStacked () { const [t, close] = thread(); this.stack.push({ close }); return t.then(_ => this.deferNow()) }
}
const rand = x => Math.random() * x
const effect = f => x => (f(x), x)
const thread = close => [new Promise(r => { close = effect(r) }), close]
const sleep = ms => new Promise(r => setTimeout(r, ms))
const throttle = (p, ms) => Promise.all([ p, sleep(ms) ]).then(([ value, _ ]) => value)
const myJob = x => sleep(rand(5000)).then(_ => x * 10)
const pool = new Pool(4)
async function jobQueued(x) {
await sleep(rand(5000))
console.log("queueing", x)
const close = await pool.open()
console.log(" sending", x)
const result = await throttle(myJob(x), 3000).then(close)
console.log(" received", result)
return result
}
Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(jobQueued))
.then(JSON.stringify)
.then(console.log, console.error)
.as-console-wrapper { min-height: 100%; }
希望你学到了一些关于 JavaScript 的乐趣!如果您喜欢这个,请尝试扩展Pool功能。也许添加一个简单的timeout功能来确保工作在一定时间内完成。或者可以添加一个retry函数,如果它产生错误或超时,则重新运行作业。要查看Pool应用于另一个问题,请参阅此问答。如果您有任何问题,我很乐意为您提供帮助 :D