任务不可序列化错误

数据挖掘 大数据 阿帕奇火花 斯卡拉
2021-09-25 01:11:43
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.cassandra.CassandraSQLContext
object Test {
  val sparkConf = new SparkConf(true).set("spark.cassandra.connection.host", <Cassandra Server IP>)
  val sc = new SparkContext(sparkConf)
  val cassandraSQLContext = new CassandraSQLContext(sc)
  val numberAsString = cassandraSQLContext.sql("select * from testing.test").first().getAs[Int]("number").toString()
  val testRDD = sc.parallelize(List(0, 0))
  val newRDD = testRDD.map { x => numberAsString }
}

这是我在 Spark 中编写的代码。我期待它能够正常工作,因为我正在评估 numberAsString 值,然后在 map 函数中使用它,但它给了我任务不可序列化的错误。我正在本地模式下运行作业。

火花壳上的错误

火花壳上的错误

3个回答

查看Stack Overflow 上关于 Spark 中的序列化异常的另一个问题,它说匿名函数序列化其包含的类,如果该类包含SparkContext- 这是不可序列化的 - 则会引发错误。

也许这正在发生在你身上?

我将代码更改为:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.cassandra.CassandraSQLContext
import org.apache.spark.rdd.RDD
object Test2 {
  def calculate(numberAsString: String, testRDD: RDD[Int]): RDD[String] = {
    val newRDD = testRDD.map { x => numberAsString }
    newRDD
  }
}

object Test {
  val sparkConf = new SparkConf(true).set("spark.cassandra.connection.host", <Cassandra Server IP>)
  val sc = new SparkContext(sparkConf)
  val cassandraSQLContext = new CassandraSQLContext(sc)
  val numberAsString = cassandraSQLContext.sql("select * from hdfc.test").first().getAs[Int]("number").toString()
  val testRDD = sc.parallelize(List(0, 0))
  val newRDD = Test2.calculate(numberAsString, testRDD)
}

现在,当它将序列化包含类时,它将被序列化。

你应该广播你的变量。

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.cassandra.CassandraSQLContext
object Test {
  val sparkConf = new SparkConf(true).set("spark.cassandra.connection.host", <Cassandra Server IP>)
  val sc = new SparkContext(sparkConf)
  val cassandraSQLContext = new CassandraSQLContext(sc)
  val numberAsString = cassandraSQLContext.sql("select * from testing.test").first().getAs[Int]("number").toString()
  val numberAsStringBC = sc.broadcast(numberAsString)
  val testRDD = sc.parallelize(List(0, 0))
  val newRDD = testRDD.map { x => numberAsStringBC.value }
}