使用 Apache Spark 进行机器学习。不断收到序列化错误

数据挖掘 阿帕奇火花 pyspark 情绪分析
2021-09-23 14:00:13

所以我正在使用 Spark 进行情感分析,并且我一直在使用它使用的序列化程序(我认为)来传递 python 对象。

PySpark worker failed with exception:
Traceback (most recent call last):
  File "/Users/abdul/Desktop/RSI/spark-1.0.1-bin-    hadoop1/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/abdul/Desktop/RSI/spark-1.0.1-bin-    hadoop1/python/pyspark/serializers.py", line 191, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/Users/abdul/Desktop/RSI/spark-1.0.1-bin-    hadoop1/python/pyspark/serializers.py", line 123, in dump_stream
    for obj in iterator:
  File "/Users/abdul/Desktop/RSI/spark-1.0.1-bin-    hadoop1/python/pyspark/serializers.py", line 180, in _batched
    for item in iterator:
TypeError: __init__() takes exactly 3 arguments (2 given)

序列化程序的代码可在此处获得

我的代码在这里

1个回答

(Py)Spark 中最常见的序列化错误意味着您的分布式代码的某些部分(例如传递给 的函数map依赖不可序列化的数据考虑以下示例:

rdd = sc.parallelize(range(5))
rdd = rdd.map(lambda x: x + 1)
rdd.collect()

在这里,您已分发集合和 lambda 函数以发送给所有工作人员。Lambda 是完全自包含的,因此很容易将其二进制表示复制到其他节点而无需担心。

现在让我们让事情变得更有趣:

f = open("/etc/hosts")
rdd = sc.parallelize(range(100))
rdd = rdd.map(lambda x: f.read())
rdd.collect()
f.close()

繁荣!序列化模块中的奇怪错误!刚刚发生的事情是我们试图将f文件对象传递给工作人员。显然,文件对象是本地数据的句柄,因此不能发送到其他机器。


那么您的特定代码中发生了什么?如果没有实际数据和知道记录格式,我无法完全调试它,但我想问题出在这一行:

def vectorizer(text, vocab=vocab_dict):

在 Python 中,第一次调用函数时会初始化关键字参数。当您sc.parallelize(...).map(vectorizer)在其定义之后调用时,它在本地vocab_dict可用,但远程工作人员对此一无所知。因此,调用函数时使用的参数比预期的要少,这会导致错误。__init__() takes exactly 3 arguments (2 given)

另请注意,您遵循非常糟糕的 sc.parallelize(...)...collect()呼叫模式。首先,您将集合传播到整个集群,进行一些计算,然后提取结果。但是在这里来回发送数据毫无意义。相反,您可以只在本地进行这些计算,并且仅当您使用非常大的数据集时才运行 Spark 的并行进程(amazon_dataset我猜是 main )。