在 PySpark 中逐行合并多个数据帧

数据挖掘 Python 阿帕奇火花 交叉验证 pyspark
2021-09-13 21:44:29

我有 10 个数据帧pyspark.sql.dataframe.DataFrame,从randomSplitas(td1, td2, td3, td4, td5, td6, td7, td8, td9, td10) = td.randomSplit([.1, .1, .1, .1, .1, .1, .1, .1, .1, .1], seed = 100)现在我想将 9 加入td一个数据帧,我应该怎么做?

我已经尝试过unionAll,但是这个函数只接受两个参数。

td1_2 = td1.unionAll(td2) 
# this is working fine

td1_2_3 = td1.unionAll(td2, td3) 
# error TypeError: unionAll() takes exactly 2 arguments (3 given)

有没有办法按行组合两个以上的数据帧?

这样做的目的是我在不使用 PySparkCrossValidator方法的情况下手动进行 10 折交叉验证,因此将 9 用于训练,1 用于测试数据,然后我将重复它以用于其他组合。

4个回答

窃取自:https ://stackoverflow.com/questions/33743978/spark-union-of-multiple-rdds

在链接联合之外,这是对 DataFrame 执行此操作的唯一方法。

from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

unionAll(td2, td3, td4, td5, td6, td7, td8, td9, td10)

发生的情况是,它将您作为参数传递的所有对象并使用 unionAll 对它们进行归约(此归约来自 Python,而不是 Spark 归约,尽管它们的工作方式相似),最终将其归约为一个 DataFrame。

如果它们不是 DataFrames,它们是普通的 RDD,你可以将它们的列表传递给 SparkContext 的联合函数

编辑:出于您的目的,我提出了一种不同的方法,因为您必须针对不同的折叠重复整个联合 10 次以进行交叉验证,所以我将添加一行所属的折叠标签,并根据每个折叠过滤您的 DataFrame标签

有时,当要组合的数据帧没有相同的列顺序时,最好df2.select(df1.columns)确保两个 df 在联合之前具有相同的列顺序。

import functools 

def unionAll(dfs):
    return functools.reduce(lambda df1,df2: df1.union(df2.select(df1.columns)), dfs) 

例子:

df1 = spark.createDataFrame([[1,1],[2,2]],['a','b'])
# different column order. 
df2 = spark.createDataFrame([[3,333],[4,444]],['b','a']) 
df3 = spark.createDataFrame([555,5],[666,6]],['b','a']) 

unioned_df = unionAll([df1, df2, df3])
unioned_df.show() 

在此处输入图像描述

否则它将生成以下结果。

from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs) 

unionAll(*[df1, df2, df3]).show()

在此处输入图像描述

使用递归怎么样?

def union_all(dfs):
    if len(dfs) > 1:
        return dfs[0].unionAll(union_all(dfs[1:]))
    else:
        return dfs[0]

td = union_all([td1, td2, td3, td4, td5, td6, td7, td8, td9, td10])
def unionAll(a,b):
    return a.unionByName(b)

sdf1_sdf2 = reduce(unionAll,[sdf1,sdf2])