Spark,最佳地将单个 RDD 拆分为两个

数据挖掘 阿帕奇火花 pyspark
2021-09-26 07:12:28

我有一个大型数据集,我需要根据特定参数将其分成几组。我希望尽可能高效地处理工作。我可以设想两种方法

选项 1 - 从原始 RDD 创建地图并过滤

def customMapper(record):
    if passesSomeTest(record):
        return (1,record)
    else:
        return (0,record)

mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()

选项 2 - 直接过滤原始 RDD

def customFilter(record):
    return passesSomeTest(record)

rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()

第一种方法必须迭代原始数据集的所有记录3次,而第二种方法只需要这样做两次,在正常情况下,但是spark在幕后做了一些图形构建,所以我可以想象它们是以同样的方式有效地完成。我的问题是:a.)一种方法是否比另一种方法更有效,或者火花图构建是否使它们等效 b.)是否可以一次完成此拆分

1个回答

首先让我告诉你,我不是 Spark 专家;在过去的几个月里我一直在使用它,我相信我现在明白了,但我可能错了。

所以,回答你的问题:

a.) 它们是等价的,但不是你看到的那样;如果您想知道,Spark 不会优化图形,但customMapper在这两种情况下仍会执行两次;这是因为对于 spark, rdd1andrdd2是两个完全不同的 RDD,它会从叶子开始自下而上地构建变换图;所以选项 1 将转换为:

rdd0 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==0).cache()
rdd1 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==1).cache()

正如你所说,customMapper执行两次(而且,也rddIn将被读取两次,这意味着如果它来自数据库,它可能会更慢)。

b.)有一种方法,你只需要cache()在正确的地方移动:

mappedRdd = rddIn.map(lambda x: customMapper(x)).cache()
rdd0 = mappedRdd.filter(lambda x: x[0]==0)
rdd1 = mappedRdd.filter(lambda x: x[0]==1)

通过这样做,我们告诉 spark 它可以存储 ; 的部分结果mappedRdd然后它会将这些部分结果用于rdd1rdd2从火花的角度来看,这相当于:

mappedRdd = rddIn.map(lambda x: customMapper(x)).saveAsObjectFile('..')
# forget about everything
rdd0 = sc.objectFile('..').filter(lambda x: x[0]==0)
rdd1 = sc.objectFile('..').filter(lambda x: x[0]==1)