我想在 Spark 中实现 K-means 算法。我正在寻找一个起点,我发现了Berkeley 的 naive implementation。但是,这是分布式的吗?
我的意思是我看不到 mapreduce 操作。或者,当在 Spark 中提交时,框架实际上会在后台制作所需的技巧来分发算法?
我还发现Spark 显示 mapreduce 退出,我使用的是 Spark 1.6。
编辑:此代码产生运行时错误,请在此处检查。
我想在 Spark 中实现 K-means 算法。我正在寻找一个起点,我发现了Berkeley 的 naive implementation。但是,这是分布式的吗?
我的意思是我看不到 mapreduce 操作。或者,当在 Spark 中提交时,框架实际上会在后台制作所需的技巧来分发算法?
我还发现Spark 显示 mapreduce 退出,我使用的是 Spark 1.6。
编辑:此代码产生运行时错误,请在此处检查。
在您发布的那个链接中,您可以在最后查看 python 完整解决方案并通过它查看所有分发的内容。简而言之,有些部分是分布式的,比如从文件中读取数据,但非常重要的部分,比如距离计算不是。
跑下来,我们看到:
sc = SparkContext("local[6]", "PythonKMeans")
这将实例化上下文并创建一个本地集群,作业将提交到该集群
lines = sc.textFile(..)
这仍在设置中。尚未进行任何操作。您可以通过在代码中放置时间语句来验证这一点
data = lines.map(lambda x: (x.split("#")[0], parseVector(x.split("#")[1])))
此处的 lambda 将应用于行,因此此操作将并行拆分文件。请注意,实际行cache()
的末尾也有 a(请参阅cache ])。data
只是对内存中 spark 对象的引用。(我可能在这里错了,但我认为操作仍然没有发生)
计数 = data.count()
这会强制并行计算开始,并存储计数。最后,参考数据仍然有效,我们将使用它进行进一步的计算。我将在这里停止详细解释,但无论在哪里data
使用都是可能的并行计算。python 代码本身是单线程的,并与 Spark 集群接口。
一个有趣的行是:
tempDist = sum(np.sum((centroids[x] - y) ** 2) for (x, y) in newCentroids.iteritems())
centroids
是 python 内存中的一个对象,就像newCentroids
. 因此,此时,所有计算都在内存中完成(在客户端上,通常客户端很瘦,即功能有限,或者客户端是 SSH shell,因此计算机资源是共享的。理想情况下,您不应该做任何事情计算),所以没有使用并行化。您可以通过并行执行此计算来进一步优化此方法。理想情况下,您希望 python 程序永远不会直接处理单个点 和 价值观。
我不知道那个具体的实现,但我们在我的工作中使用了mllib k-means,取得了一定程度的成功。它是分布式的并在 Spark 上运行!