与纯 python 替代方案相比,Pyspark 代码性能不够

数据挖掘 Python 阿帕奇火花 pyspark
2022-02-24 12:04:03

我将下面粘贴在 python 中的现有代码转换为 pyspark。

Python代码:

import json
import csv


def main():
    # create a simple JSON array
    with open('paytm_tweets_data_1495614657.json') as str:

        tweetsList = []
        # change the JSON string into a JSON object
        jsonObject = json.load(str)

        #print(jsonObject)

        # # print the keys and values
        for i in range(len(jsonObject)):
            tweetsList.insert(i,jsonObject[i]["text"])

        #print(tweetsList)
    displaySentiment(tweetsList)



def displaySentiment(tweetsList):
    aDict = {}

    from sentiment import sentiment_score

    for i in range(len(tweetsList)):
        aDict[tweetsList[i]] = sentiment_score(tweetsList[i])
    print (aDict)


    with open('PaytmtweetSentiment.csv', 'w') as csv_file:
        writer = csv.DictWriter(csv_file, fieldnames = ["Tweets", "Sentiment Value"])
        writer.writeheader()
        writer = csv.writer(csv_file)
        for key, value in aDict.items():
            writer.writerow([key, value])


if __name__ == '__main__':
    main()

转换后的 Pyspark 代码:

import json
import csv
import os
from pyspark import SparkContext, SparkConf
from pyspark.python.pyspark.shell import spark

os.environ['PYSPARK_PYTHON'] = "/usr/local/bin/python3"


def main():
    path = "/Users/i322865/DeepInsights/bitbucket-code/ai-engine/twitter-sentiment-analysis/flipkart_tweets_data_1495601666.json"
    peopleDF = spark.read.json(path).rdd
    df = peopleDF.map(lambda row: row['text'])
    print(df.collect())
    displaySentiment(df.collect())



def displaySentiment(tweetsList):
    from sentiment import sentiment_score

    aDict = sentiment_score(tweetsList)

    #
    with open('paytmtweetSentiment.csv', 'w') as csv_file:
        writer = csv.DictWriter(csv_file, fieldnames = ["Tweets", "Sentiment Value"])
        writer.writeheader()
        writer = csv.writer(csv_file)
        for i in range(len(tweetsList)):
            writer.writerow([tweetsList[i], aDict[i]])
            print([tweetsList[i], aDict[i]])


if __name__ == '__main__':
    conf = SparkConf().setAppName("Test").setMaster("local")
    sc = SparkContext.getOrCreate(conf=conf)
    main()

我运行了这两个程序,但没有看到任何显着的性能改进。我错过了什么?请问你能发表一些想法吗?

1个回答

您已经使用“本地”模式配置实例化了一个 sparkContext 对象。这意味着您已为计算机上的单个多核 Java 虚拟机分配了资源。在此配置中,您无法获得比使用 python 更好的性能。因为 :

  • 在本地模式下,使用 Spark 的资源(cpu、内存......)比使用 python 的资源少(你不能拥有比计算机更多资源的 JVM)。
  • PySpark 代码在执行前被转换为 Scala 代码。

当您在多个节点上使用 Spark 时,Spark 的优势就会显现在这种配置中,spark master 是 yarn、mesos 或独立的。在这种情况下,一个 Spark 作业将被分成多个任务,每个节点将专用于不同的任务。

例如,如果您有 3 个节点和 6 个任务,则每个节点可以处理 2 个任务。如果你有 6 个节点和 6 个任务,每个节点将处理一个任务。使用简单的 python,认为您只有 1 个节点用于“6 个任务”。因此,对于大型任务(足够大的数据集),Spark 会比简单的 python 产生更好的延迟。