如何在 Spark 机器学习中同时应用 StandardScaler 和 OneHotEncoder?

数据挖掘 机器学习 线性回归 阿帕奇火花 斯卡拉
2022-03-16 05:20:32

我尝试创建一个机器学习模型,线性回归,来预测钻石的价格。

我在网上找到的所有示例都没有使用 MinMaxScaler 或 StandardScaler 进行数据缩放的步骤。但我个人认为这是 ML 的重要一步。Spark 指南显示了此功能,但与 OneHotEncoding 分开。此外,由于 Scala 中 OneHotEncoding 的结果与 Python 不同(例如,它看起来像这样:(4,[3],[1.0])在 Spark 中,而在 Python 中它是 1 和 0 的组合),我很困惑在哪里应用 StandardScaler - 在索引之后和之前OneHotEncoder,还是在 OneHotEncoder 之后或在其他步骤中?

稍微清理一下数据后我的代码。此代码有效,但预测远非良好。我使用 Scala 数据框和管道:

import org.apache.spark.ml.feature.{VectorAssembler,StringIndexer,VectorIndexer,OneHotEncoder}
import org.apache.spark.ml.linalg.Vectors

val cutIndexer = new StringIndexer().setInputCol("cut").setOutputCol("cutIndex")
val colorIndexer = new StringIndexer().setInputCol("color").setOutputCol("colorIndex")
val clarityIndexer = new StringIndexer().setInputCol("clarity").setOutputCol("clarityIndex")

val cutEncoder = new OneHotEncoder().setInputCol("cutIndex").setOutputCol("cutVec")
val colorEncoder = new OneHotEncoder().setInputCol("colorIndex").setOutputCol("colorVec")
val clarityEncoder = new OneHotEncoder().setInputCol("clarityIndex").setOutputCol("clarityVec")

val assembler = (new VectorAssembler()
                  .setInputCols(Array("carat", "cutVec", "colorVec", "clarityVec", "depth", "table", "x", "y", "z"))
                  .setOutputCol("features") )

val scaler = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures").setWithStd(true).setWithMean(false)

val Array(training, test) = df_label.randomSplit(Array(0.75, 0.25))

import org.apache.spark.ml.Pipeline

val lr = new LinearRegression()

val pipeline = new Pipeline().setStages(Array(cutIndexer,colorIndexer, clarityIndexer,cutEncoder,colorEncoder,clarityEncoder, assembler, scaler, lr))

val model = pipeline.fit(training)

val results = model.transform(test)

数据子集:

+-----+-----+---------+-----+-------+-----+-----+----+----+----+
|label|carat|      cut|color|clarity|depth|table|   x|   y|   z|
+-----+-----+---------+-----+-------+-----+-----+----+----+----+
|  326| 0.23|    Ideal|    E|    SI2| 61.5| 55.0|3.95|3.98|2.43|
|  326| 0.21|  Premium|    E|    SI1| 59.8| 61.0|3.89|3.84|2.31|
|  327| 0.23|     Good|    E|    VS1| 56.9| 65.0|4.05|4.07|2.31|

提前致谢!

1个回答

好的,我自己想出了一个解决方案。在每个分类特征上应用了最近添加的 OneHotEncoderEstimator 而不是 OneHotEncode。然后我在 VectorAssembler 的特征上使用了 MinMaxScaler。

val cutIndexer = new StringIndexer().setInputCol("cut").setOutputCol("cutIndex")
val colorIndexer = new StringIndexer().setInputCol("color").setOutputCol("colorIndex")
val clarityIndexer = new StringIndexer().setInputCol("clarity").setOutputCol("clarityIndex")

import org.apache.spark.ml.feature.OneHotEncoderEstimator

val encoder = new OneHotEncoderEstimator().setInputCols(Array("cutIndex", "colorIndex", "clarityIndex")).setOutputCols(Array("cutIndexEnc", "colorIndexEnc", "clarityIndexEnc"))


val assembler = (new VectorAssembler()
                    .setInputCols(Array("carat", "cutIndexEnc", "colorIndexEnc", "clarityIndexEnc", "depth", "table", "x", "y", "z"))
                    .setOutputCol("features_assem") )

import org.apache.spark.ml.feature.MinMaxScaler
val scaler = new MinMaxScaler().setInputCol("features_assem").setOutputCol("features")

val Array(training, test) = df_label.randomSplit(Array(0.75, 0.25))

import org.apache.spark.ml.Pipeline

val lr = new LinearRegression()

val pipeline = new Pipeline().setStages(Array(cutIndexer,colorIndexer, clarityIndexer,encoder, assembler,scaler, lr))

val model = pipeline.fit(training)

val predictions = model.transform(test)