将 csv 文件内容导入 pyspark 数据帧

数据挖掘 pyspark
2021-10-05 02:18:47

如何将 .csv 文件导入 pyspark 数据帧?我什至尝试在 Pandas 中读取 csv 文件,然后使用 createDataFrame 将其转换为 spark 数据帧,但它仍然显示一些错误。有人可以指导我完成这个吗?另外,请告诉我如何导入 xlsx 文件?我正在尝试将 csv 内容导入 pandas 数据帧,然后将其转换为 spark 数据帧,但它显示错误:

"Py4JJavaError" An error occurred while calling o28.applySchemaToPythonRDD. : java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient 

我的代码是:

from pyspark import SparkContext 
from pyspark.sql import SQLContext 
import pandas as pd 
sqlc=SQLContext(sc) 
df=pd.read_csv(r'D:\BestBuy\train.csv') 
sdf=sqlc.createDataFrame(df) 
4个回答

“如何将 .csv 文件导入 pyspark 数据帧?” ——有很多方法可以做到这一点;最简单的方法是使用 Databrick 的 spark-csv 模块启动 pyspark。您可以通过启动 pyspark 来做到这一点

pyspark --packages com.databricks:spark-csv_2.10:1.4.0

那么您可以按照以下步骤操作:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('cars.csv')

另一种方法是使用 rdd 读取文本文件

myrdd = sc.textFile("yourfile.csv").map(lambda line: line.split(","))

然后转换您的数据,使每个项目都采用正确的模式格式(即 Ints、Strings、Floats 等)。你会想要然后使用

>>> from pyspark.sql import Row
>>> Person = Row('name', 'age')
>>> person = rdd.map(lambda r: Person(*r))
>>> df2 = sqlContext.createDataFrame(person)
>>> df2.collect()
[Row(name=u'Alice', age=1)]
>>> from pyspark.sql.types import *
>>> schema = StructType([
...    StructField("name", StringType(), True),
...    StructField("age", IntegerType(), True)])
>>> df3 = sqlContext.createDataFrame(rdd, schema)
>>> df3.collect()
[Row(name=u'Alice', age=1)]

参考:http ://spark.apache.org/docs/1.6.1/api/python/pyspark.sql.html#pyspark.sql.Row

“另外,请告诉我如何导入 xlsx 文件?” --“大数据”中不使用Excel文件;Spark 旨在用于大型文件或数据库。如果您有一个大小为 50GB 的 Excel 文件,那么您做错了。Excel 甚至无法打开这么大的文件。根据我的经验,任何超过 20MB 和 Excel 的东西都会死掉。

以下对我来说效果很好:

from pyspark.sql.types import *
schema = StructType([StructField("name", StringType(), True),StructField("age", StringType(), True)]
pd_df = pd.read_csv("<inputcsvfile>")
sp_df = spark.createDataFrame(pd_df, schema=schema)

我在本地目录中有一个文件“temp.csv”。从那里,我使用本地实例执行以下操作:

>>> from pyspark import SQLContext
>>> from pyspark.sql import Row
>>> sql_c = SQLContext(sc)
>>> d0 = sc.textFile('./temp.csv')
>>> d0.collect()
[u'a,1,.2390', u'b,2,.4390', u'c,3,.2323']
>>> d1 = d0.map(lambda x: x.split(',')).map(lambda x: Row(label = x[0], number = int(x[1]), value = float(x[2])))
>>> d1.take(1)
[Row(label=u'a', number=1, value=0.239)]
>>> df = sql_c.createDataFrame(d1)
>>> df_cut = df[df.number>1]
>>> df_cut.select('label', 'value').collect()
[Row(label=u'b', value=0.439), Row(label=u'c', value=0.2323)]

所以 d0 是我们发送到 spark RDD 的原始文本文件。为了让您创建一个数据框,您希望将 csv 分开,并使每个条目都成为 Row 类型,就像我在创建 d1 时所做的那样。最后一步是从 RDD 中制作数据帧。

您可以使用DataBricks的spark-csv包,它会自动为您做很多事情,比如处理标题、使用转义字符、自动模式推断等。从 Spark 2.0 开始,有一个用于处理 CSV 的内置函数。