使用 Spark 或 Scikit Learn 进行模型集成

数据挖掘 pyspark 管道
2021-10-02 23:26:51

我正在使用Spark MLLib进行预测,我想知道是否可以创建您的自定义 Estimator。

这是我希望我的模型对 Spark api 执行的操作的可重现性


from sklearn.datasets import load_diabetes
import pandas as  pd
import pyspark 
from pyspark.ml.feature import VectorAssembler, SQLTransformer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
# Query diabetes data 
diab = load_diabetes()
df = pd.DataFrame(diab.data, columns=diab.feature_names)
df['is_male'] = df.sex > 0
df.drop('sex', inplace=True, axis=1)
df['label'] = diab.target
# Model made with Spark

spark = pyspark.sql.SparkSession.builder.master('local[10]').appName('A random spark context').getOrCreate()
def create_gender_model_male(): 
    return Pipeline(stages=[SQLTransformer(statement='SELECT * FROM __THIS__ WHERE is_male'),
                                VectorAssembler(inputCols=['age', 'bmi', 'bp', 's1'],outputCol='features'),
                                LogisticRegression(featuresCol='features', labelCol='label', maxIter=100,
                                                   elasticNetParam=1, regParam=0.)
                                ])
def create_gender_model_female(): 
    return Pipeline(stages=[SQLTransformer(statement='SELECT * FROM __THIS__ WHERE not is_male'),
                                VectorAssembler(inputCols=['age', 'bmi', 'bp', 's1'],outputCol='features'),
                                LogisticRegression(featuresCol='features', labelCol='label', maxIter=100,
                                                   elasticNetParam=1, regParam=0.)
                                ])
df = spark.createDataFrame(df)
class MixedModel():
    def __init__(self):
        self.models = {'male': create_gender_model_male(), 'female': create_gender_model_female()}
        self.fitted_models = {'male': None, 'female': None}
    def fit(self, df): 
        self.fitted_models['male'] = self.models['male'].fit(df)
        self.fitted_models['female'] = self.models['female'].fit(df)
    def predict(self, df): 
        return self.fitted_models['male'].transform(df).union(self.fitted_models['female'].transform(df))
mm = MixedModel()
mm.fit(df)
mm.transform(df)

在这里,例如,我对每种性别都有一个逻辑回归,但如果我愿意,我也希望能够对男性进行预测,并使用逻辑回归对女性进行预测。

在一个完美的世界中,会有一个函数:

ModelAggregation(('is_male is true, male, model_for_male), ('is_male is false', model_for_female)))

这将返回一个对象,例如我的模型聚合

2个回答

正如您在问题中所说,使用 MLLib 中提供的基线算法无法做到这一点。

您可以通过以下两种方式做到这一点:

  1. 创建一个函数来生成一个管道
  2. 创建一个 Meta Estimator,它将获取您的基础学习者和分叉列。

第一个是您在问题中所说的,第二个已由 Brian Spiring 解释。

正如您所说,制作自定义 Estimator 和 Transformer 将使其与所有 MLLib Tools 作为Tuning Tools很好地配合使用。

如果您想要一个非常精确的示例,可以使用我的库,它实现了使用 Spark 进行集成学习的元算法。

有一种方法可以在Stacking Classifier中非常简单地并行训练多个估计器,对于所有这些估计器,您都有一个示例,说明如何要求估计器作为参数(选择逻辑回归或决策树)。

你可以从中挑选想法!

编辑:StackOverflow 上的类似问题

您可以将性别作为参数传递来创建管道

def create_pipeline(gender): 

    select_statement = "SELECT * FROM __THIS__ WHERE {predicate}".format(predicate = "is_male" if gender == "male" else "not is_male") 

    pipeline = Pipeline(stages=[transformer = SQLTransformer(statement=select_statement),
                                VectorAssembler(inputCols=['age', 'bmi', 'bp', 's1'],outputCol='features'),
                                LogisticRegression(featuresCol='features', labelCol='label', maxIter=100,
                                                   elasticNetParam=1, regParam=0.)
                            ])
    return pipeline

然后存储所有模型

model_types = {'model_male': create_model(gender='male'),
               'model_not_male': create_model(gender='not_male')}

适合每个模型

for model_types in models:
    pipeline = models[model_types]
    model = pipeline.fit(df)
    model.transform(df)