如何允许 Web Workers 在执行计算的同时接收新数据?

IT技术 javascript sorting interrupt web-worker worker
2021-03-13 21:23:04

我想使用 Web Workers 对数组进行排序。但是这个数组可能会随着时间的推移接收新值,而工作人员仍在执行排序功能。

所以我的问题是,如何在收到新项目后“停止”工作人员的排序计算,以便它可以对具有该项目的数组执行排序,同时仍然保持已经进行的排序?

例子:

let worker = new Worker('worker.js');
let list = [10,1,5,2,14,3];
worker.postMessage({ list });
setInterval(() => worker.postMessage({ num: SOME_RANDOM_NUM, list }), 100);

worker.onmessage = event => {
  list = event.data.list;
}

所以可以说,我已经过了 50 岁,在此之前工作人员在排序方面取得了一些进展,现在我有这样的事情: [1, 2, 3, 10, 5, 14, 50]. 这意味着排序在 index 处停止3所以我把这个new数组回给 worker,这样它就可以从 position 继续排序3

我怎样才能做到这一点,因为没有办法暂停/恢复网络工作者?

4个回答

即使 Worker 工作在与主页面不同的其他线程上,因此可以连续运行而不会阻塞 UI,但它仍然在单个线程上运行。

这意味着在你的排序算法完成之前,Worker 将延迟消息事件处理程序的执行;它和主线程一样被阻塞。

即使你从这个 Worker 内部使用了另一个 Worker,问题也是一样的。

唯一的解决方案是使用一种生成器函数作为排序器,并时不时地产生它,以便事件可以被执行。

但是这样做会大大减慢您的排序算法。

为了让它变得更好,您可以尝试挂钩每个事件循环,这要归功于 MessageChannel 对象:您在一个端口通话并在下一个事件循环中接收消息。如果您再次与另一个端口通话,那么您对每个事件循环都有自己的钩子。

现在,最好的办法是在每个事件循环中运行一个好的批处理,但对于演示,我将只调用我们的生成器函数的一个实例(我从这个 Q/A借来

请注意,使用 async 函数也可以实现相同的效果,这在某些情况下可能更实用:

你可以通过一些技巧来做到这一点——借助setTimeout函数中断。例如,没有附加线程并行执行 2 个函数是不可能的,但是通过setTimeout函数中断技巧,我们可以这样做:

函数的并行执行示例

var count_0 = 0,
    count_1 = 0;

function func_0()
{
    if(count_0 < 3)
        setTimeout(func_0, 0);//the same: setTimeout(func_0);

    console.log('count_0 = '+count_0);
    count_0++
}

function func_1()
{
    if(count_1 < 3)
        setTimeout(func_1, 0);

    console.log('count_1 = '+count_1)
    count_1++
}

func_0();
func_1();

你会得到这个输出:

count_0 = 0
count_1 = 0
count_0 = 1
count_1 = 1
count_0 = 2
count_1 = 2
count_0 = 3
count_1 = 3

为什么有可能?因为setTimeout函数需要一些时间来执行。这一次甚至足以执行以下代码中的某些部分。

为您解决

对于这种情况,您必须编写自己的数组排序函数(或者您也可以使用我提供的以下函数),因为我们无法中断本机sort函数。在这个你自己的函数中,你必须使用这个setTimeout函数中断技巧。您可以收到您的message活动通知。

在下面的示例中,我在数组的一半长度中进行了中断,您可以根据需要进行更改。

自定义排序功能中断的示例

var numbers = [4, 2, 1, 3, 5];

// this is my bubble sort function with interruption
/**
 * Sorting an array. You will get the same, but sorted array.
 * @param {array[]} arr – array to sort
 * @param {number} dir – if dir = -1 you will get an array like [5,4,3,2,1]
 *                 and if dir = 1 in opposite direction like [1,2,3,4,5]
 * @param {number} passCount – it is used only for setTimeout interrupting trick.
 */
function sortNumbersWithInterruption(arr, dir, passCount)
{
    var passes = passCount || arr.length,
        halfOfArrayLength = (arr.length / 2) | 0; // for ex. 2.5 | 0 = 2

    // Why we need while loop: some values are on
    // the end of array and we have to change their
    // positions until they move to the first place of array.
    while(passes--)
    {
        if(!passCount && passes == halfOfArrayLength)
        {
            // if you want you can also not write the following line for full break of sorting
            setTimeout(function(){sortNumbersWithInterruption(arr, dir, passes)}, 0);
            /*
                You can do here all what you want. Place 1
            */
            break
        }

        for(var i = 0; i < arr.length - 1; i++)
        {
            var a = arr[i],
                b = arr[i+1];

            if((a - b) * dir > 0)
            {
                arr[i] = b;
                arr[i+1] = a;
            }
        }

        console.log('array is: ' + arr.join());
    }

    if(passCount)
        console.log('END sring is: ' + arr.join());
}

sortNumbersWithInterruption(numbers, -1); //without passCount parameter
/*
    You can do here all what you want. Place 2
*/
console.log('The execution is here now!');

你会得到这个输出:

数组为:4,2,3,5,1
数组为:4,3,5,2,1
执行完毕
数组是:4,5,3,2,1
数组是:5,4,3,2,1
END sring 是:5,4,3,2,1
@KadoBOT,如果你想在这个数组中添加一个新的数字,并且如果你想让这个新的 lalue 也被排序,那么你必须打破 while 循环setTimeout并开始新的排序,因为一部分数字已经排序/动了。您可以使用push数组函数添加数组的新值
2021-05-09 21:23:04
能不能再详细点。即:我如何将一个新号码(从外部)传递给sortNumbersWithInterruption
2021-05-14 21:23:04
这就是我挣扎的部分:(
2021-05-15 21:23:04

你可以用插入排序(种类)来做到这一点。这是一个想法:

  1. 使用内部空数组启动您的工作程序(空数组显然已排序)

  2. 您的工作人员只收到元素而不是整个数组

  3. 您的工作人员将任何接收到的元素正确插入到数组中

  4. 每 n 秒,如果当前数组在最后一个事件之后发生了变化,worker 就会用当前数组引发一条消息。(如果你愿意,你可以在每次插入时发送数组,但以某种方式缓冲更有效)

最终,你得到了整个数组,如果添加了任何项,你将收到更新后的数组。

注意:因为您的数组总是排序的,所以您可以使用二分搜索在正确的位置插入。这是非常有效的。

有两个不错的选择。

选项1: Worker.terminate()

第一个是杀死你现有的网络工作者并开始一个新的。为此,您可以使用Worker.terminate().

接口terminate()方法Worker立即终止Worker. 这不会为工人提供完成其操作的机会;它只是立即停止。

这种方法的唯一缺点是:

  • 你失去了所有的工人状态。如果您必须为请求将大量数据复制到其中,则必须再次执行所有操作。
  • 它涉及线程的创建和销毁,这并不像大多数人想象的那么慢,但是如果您大量终止 Web Worker,则可能会导致问题。

如果这些都不是问题,那么它可能是最简单的选择。

就我而言,我有很多状态。我的工作人员正在渲染图像的一部分,当用户平移到不同的区域时,我希望它停止正在执行的操作并开始渲染新区域。但是渲染图像所需的数据非常庞大。

在您的情况下,您拥有不想使用的(可能很大)列表的状态。

选项 2:屈服

第二种选择基本上是进行协作多任务处理。你像往常一样运行你的计算,但你时不时地暂停(yield)并说“我应该停止吗?”,就像这样(这是一些无意义的计算,而不是排序)。

let requestId = 0;

onmessage = event => {
  ++requestId;
  sortAndSendData(requestId, event.data);
}

function sortAndSendData(thisRequestId, data) {
  let isSorted = false;
  let total = 0;

  while (data !== 0) {
    // Do a little bit of computation.
    total += data;
    --data;

    // Check if we are still the current request ID.
    if (thisRequestId !== requestId) {
      // Data was changed. Cancel this sort.
      return;
    }
  }

  postMessage(total);
}

但这不会起作用,因为sortAndSendData()运行到完成并阻止 Web Worker 的事件循环。我们需要某种方式来让thisRequestId !== requestId. 不幸的是,Javascript 并没有一个yield方法。它确实有async/await所以我们可以试试这个:

let requestId = 0;

onmessage = event => {
  console.log("Got event", event);
  ++requestId;
  sortAndSendData(requestId, event.data);
}

async function sortAndSendData(thisRequestId, data) {
  let isSorted = false;
  let total = 0;

  while (data !== 0) {
    // Do a little bit of computation.
    total += data;
    --data;

    await Promise.resolve();

    // Check if we are still the current request ID.
    if (thisRequestId !== requestId) {
      console.log("Cancelled!");
      // Data was changed. Cancel this sort.
      return;
    }
  }

  postMessage(total);
}

不幸的是它不起作用。我认为这是因为async/await使用“微任务”急切地执行事情,如果可能的话,这些“微任务”会在待处理的“宏任务”(我们的网络工作者消息)之前执行。

我们需要强制我们await成为一个宏任务,你可以使用setTimeout(0)

let requestId = 0;

onmessage = event => {
  console.log("Got event", event);
  ++requestId;
  sortAndSendData(requestId, event.data);
}

function yieldToMacrotasks() {
  return new Promise((resolve) => setTimeout(resolve));
}

async function sortAndSendData(thisRequestId, data) {
  let isSorted = false;
  let total = 0;

  while (data !== 0) {
    // Do a little bit of computation.
    total += data;
    --data;

    await yieldToMacrotasks();

    // Check if we are still the current request ID.
    if (thisRequestId !== requestId) {
      console.log("Cancelled!");
      // Data was changed. Cancel this sort.
      return;
    }
  }

  postMessage(total);
}

这有效!然而,它非常缓慢。await yieldToMacrotasks()在我的带有 Chrome 的机器上大约需要 4 毫秒!这是因为浏览器将最小超时设置为setTimeout(0)1 或 4 毫秒(实际的最小值似乎很复杂)。

幸运的是,另一位用户向我指出了一种更快的方法基本上在另一个上发送消息MessageChannel也会产生事件循环,但不受最小延迟setTimeout(0)的影响。这段代码有效,每个循环只需要大约 0.04 毫秒,应该没问题。

let currentTask = {
  cancelled: false,
}

onmessage = event => {
  currentTask.cancelled = true;
  currentTask = {
    cancelled: false,
  };
  performComputation(currentTask, event.data);
}

async function performComputation(task, data) {
  let total = 0;

  let promiseResolver;

  const channel = new MessageChannel();
  channel.port2.onmessage = event => {
    promiseResolver();
  };

  while (data !== 0) {
    // Do a little bit of computation.
    total += data;
    --data;

    // Yield to the event loop.
    const promise = new Promise(resolve => {
      promiseResolver = resolve;
    });
    channel.port1.postMessage(null);
    await promise;

    // Check if this task has been superceded by another one.
    if (task.cancelled) {
      return;
    }
  }

  // Return the result.
  postMessage(total);
}

我对此并不完全满意 - 它依赖于postMessage()以 FIFO 顺序处理的事件,我怀疑这是有保证的。我怀疑您可以重写代码以使其工作,即使那不是真的。