将 Koalas 数据帧转换为 Spark 数据帧是否有相关成本?

数据挖掘 Python 表现 pyspark 分配 数据工程
2022-03-09 15:34:26

我知道 pandas 使用存储在字典中的 numpy 数组“在幕后”工作。相比之下,考拉使用底层的 Spark 框架。这是否意味着在 Koalas 和 PySpark 数据帧之间来回切换不会产生额外的成本?

#convert to pyspark dataframe
df.to_spark()

#convert to kolas frame
koalas_df = ks.DataFrame(df)

编辑:我的意思是成本,它 ks.Dataframe(ks) 会产生额外的开销吗?例如,toPandas() 将 DataFrame 中的所有记录收集到驱动程序。因此,我们只能对一小部分数据执行 toPandas()。

由于我在 Koalas 和 Spark 之间切换,我想知道是否存在任何此类开销,或者 Koalas 是否在不收集驱动程序记录的情况下“解释”Spark 数据帧。目前我正在处理一小部分数据,但我对使用大量数据时的任何缺点感兴趣。

3个回答

正如您所说,由于考拉的目标是处理大数据,因此没有像将数据收集到单个分区这样的开销ks.DataFrame(df)

_InternalFrame但是,在创建用于创建内部管理 pandas 和 PySpark 之间元数据的默认列时会产生开销。

Koalas 在内部使用名为 的不可变框架_InternalFrame,因此/koalas/databricks/koalas/internal.py如果您想了解更多详细信息,可以参考。

internal.py如果给定的 Spark DataFrame 没有索引信息,这是创建默认索引的简短代码示例。

https://github.com/databricks/koalas/blob/a42af49c55c3b4cc39c62463c0bed186e7ff9f08/databricks/koalas/internal.py#L478-L491

        if index_map is None:
            assert not any(SPARK_INDEX_NAME_PATTERN.match(name) for name in spark_frame.columns), (
                "Index columns should not appear in columns of the Spark DataFrame. Avoid "
                "index column names [%s]." % SPARK_INDEX_NAME_PATTERN
            )


            # Create default index.
            spark_frame = _InternalFrame.attach_default_index(spark_frame)
            index_map = OrderedDict({SPARK_DEFAULT_INDEX_NAME: None})


        if NATURAL_ORDER_COLUMN_NAME not in spark_frame.columns:
            spark_frame = spark_frame.withColumn(
                NATURAL_ORDER_COLUMN_NAME, F.monotonically_increasing_id()
            )

我相信 Kolas 是 Python DF 的 Databricks DF 等价物和 Spark DF 的等价物(我认为 Kolas 非常非常新;几个月前才发布)。我不知道您所说的成本是什么意思,但您可以轻松地在 Spark DF 和 Pandas DF 之间切换。请参阅下面的示例。

# Convert Koala dataframe to Spark dataframe
df = kdf.to_spark(kdf)

# Create a Spark DataFrame from a Pandas DataFrame
df = spark.createDataFrame(pdf)

# Convert the Spark DataFrame to a Pandas DataFrame
df = df.select("*").toPandas(sdf)

如果您要问您将要为所使用的时间支付多少费用,那只是几美分,真的。就个人而言,我认为 Python 比其他语言更容易做一些事情。因此,如果您有一些非 Python DF,并且想要将其转换为 Python DF、进行合并或其他任何操作,只需进行转换并进行合并。

result = pd.concat([df1, df2], axis=1)

那有意义吗?

在这个聚会上有点晚了,但这里有一些代码和输出(在我的中等笔记本电脑上运行):

startTime = datetime.now()
spark_medium = koalas_medium.to_spark()
spark_medium.show()
print('Convert Medium Size DF to spark: (3 x 10,000): ' + str (datetime.now() - 
startTime))

startTime = datetime.now()
spark_large = koalas_large.to_spark()
spark_large.show()
print('Convert Large Size DF to spark: (23 x Millions of rows): ' + str 
(datetime.now() - startTime))

输出:

  • 将中型 DF 转换为火花:(3 x 10,000):0:00:00.689036

  • 将大尺寸 DF 转换为 spark:(23 x 百万行):0:00:00.078039

为什么转换中 DF 比较慢我不能告诉你,但是运行它几次结果总是大致相同的。希望这可以帮助某人。