MapReduce 与 MPI 问题

计算科学 mpi
2021-12-14 19:32:09

我正在使用 MPI 进行练习,以按照本指令中的类似步骤计算分布在几个不同文件中的单词的频率。

但是我在第 2 步遇到了一个问题。在我的实现中,我首先根据单词的哈希码将本地计数的单词计数对发送到相应的处理器中。同时,每个处理器可能需要接收从其他处理器发送的字数对。因此,一个处理器必须同时发送和接收。

例如:

在处理器 1 中,有字数对:

猫:3

狗:4

狐狸:2

在处理器 2 中,有字数对:

鹿:2

猫:2

狐狸:1

红色:3

等等。假设单词“cat”被计算发送到处理器 2,单词“deer”、“fox”、“red”应该被发送到处理器 1。所以每个处理器的伪代码将是:

for each word-count pair:
    compute the word hash code
    MPI_Issend it to corresponding processor according to the hash code

while (true):
    MPI_Recv word-count pair
    combine the counts of the same word in this processor

MPI_Waitall
Do some other things

注意第二个循环永远不会停止,所以我需要添加一个终止条件。我实现这一点的方法是从管理处理器发送一个等级为 0 的信号到每个工作处理器。所以第二个循环变成:

while (true):
    MPI_Recv word-count pair
    determine sender from the receive status
    if (sender != manager)
        combine the counts of the same word in this processor
    else
        break

但是这个算法被证明是不稳定的。虽然它在大多数运行中都会得到预期的结果,但它可能会陷入死锁。我对失败的猜测是,一些工作人员可能在收到来自其他工作处理器的所有字数对之前过早地收到了来自管理器的终止信号,因此来自这些工作人员的相应 MPI_Issend 将不得不无限期地等待,从而出现死锁。

我希望有人可以对我的实施提出一些改进建议。或者,如果有人对此步骤有更好的算法,那也将不胜感激!感谢您的关注!

更新:

为了防止经理在任何工作人员完成接收字数对之前发送终止信号,我总结了洗牌阶段涉及的所有处理器的总字数,比如然后,每次处理器收到一个字数对时,它也应该向管理器发送一个信号。因此,在管理器部分,添加了一个循环来接收这些信号,以便管理器仅在工作人员接收到所有字数对时才发送终止信号。这样一来,manager 信号就不会被 worker 发送的内容打断,因此 worker 中的 recv 循环可以正确终止。下面是最终修改后的伪代码:N

Manager Part:
for (i = 1 : N)
    MPI_Recv signal with tag "recvd"
MPI_Send termination signal to all processors involved in the shuffle stage


Worker Part:
for each word-count pair:
    compute the word hash code
    MPI_Issend it to corresponding processor according to the hash code

while (true):
    MPI_Recv word-count pair
    **MPI_Send signal with tag "recvd" to notify the manager that a wcp is received** 
    if (sender != manager)
        combine the counts of the same word in this processor
    else
        break

MPI_Waitall
Do some other things

尽管该算法成功运行,但我不确定我是否走在正确的轨道上。我是否过度复杂化了这个问题?

2个回答

正如 Wolfgang 所观察到的,您基本上是在转置一个连接矩阵:我知道第 i 行的每个进程都发送到的您感兴趣的是列:发送到中的描述。但是,使用 an 实现这一点是多余的:您只需要知道一列中条目的总和。iijjjAllgather

MPI 有这个例程!它被称为MPI_Reduce_scatter每个进程声明一个值数组,只要秩数;然后 MPI 对该数组的组件进行逐点缩减(这是您进行求和的地方),然后将其分散,向每个进程发送一个标量。

该例程有两个主要应用:稀疏矩阵向量乘积的设置阶段,以及二维分布密集矩阵向量乘积的执行。(在我的 HPC 书籍的索引中查找 reduce-scatter )听起来这正是您所需要的:因为您的哈希码,每个人都知道他们要发送给谁,而不是从谁那里接收。

小提示:在您的接听电话中,我将使用它MPI_ANY_SOURCE来防止任何死锁或并行性能问题。

您正在尝试做的事情很困难 - 即,在不知道有多少消息以及来自哪个处理器的情况下接收消息。问题是你永远不能确定你已经收到了所有的东西,或者是否还有更多的东西,但某些处理器发送东西的速度很慢。

解决这个问题的方法是,您首先需要弄清楚每个处理器将向谁发送内容。把它想象成一个矩阵:在行中,如果处理器想要向处理器发送一些东西,你将在列中放置一个 1 ,否则将放置一个零。处理器将知道期待来自所有处理器的消息,其中矩阵的列P×Ppqpqqpq

当然,每个处理器只知道自己的行。要知道列中的内容意味着转置此矩阵。例如,看一下执行此操作的compute_point_to_point_communication_pattern()函数(当然,无需构建矩阵,但从概念上讲,这就是它正在做的事情):https ://github.com/dealii/dealii/blob/master/source/base /mpi.cc#L91 它的文档在这里:https ://www.dealii.org/8.5.0/doxygen/deal.II/namespaceUtilities_1_1MPI.html#a89b9a3309dffffe1447758157a33dbb6

获得此信息后,您可以遍历所有发件人并等待他们想要发送给您的数据。(更好的模式是只查询任何传入的消息并对其进行处理,直到您知道每个已知的发件人都向您发送了他们的消息。)