为了账号安全,请及时绑定邮箱和手机立即绑定

如何将 sklearn 管道转换为 pyspark 管道?

如何将 sklearn 管道转换为 pyspark 管道?

拉风的咖菲猫 2023-07-05 17:39:56
我们有一个机器学习分类器模型,使用 pandas 数据框和标准 sklearn 管道(StandardScaler、RandomForestClassifier、GridSearchCV 等)进行训练。我们正在开发 Databricks,并希望使用 Spark 提供的并行计算功能将该管道扩展到大型数据集。将 sklearn 管道转换为并行计算的最快方法是什么?(我们可以根据需要轻松地在 pandas 和 Spark DF 之间切换。)就上下文而言,我们的选择似乎是:使用MLLib重写管道(耗时)使用 sklearn-spark 桥接库对于选项 2,Spark-Sklearn 似乎已被弃用,但 Databricks建议我们使用 joblibspark。然而,这在 Databricks 上引发了一个例外:from sklearn import svm, datasetsfrom sklearn.model_selection import GridSearchCVfrom joblibspark import register_sparkfrom sklearn.utils import parallel_backendregister_spark() # register spark backendiris = datasets.load_iris()parameters = {'kernel':('linear', 'rbf'), 'C':[1, 10]}svr = svm.SVC(gamma='auto')clf = GridSearchCV(svr, parameters, cv=5)with parallel_backend('spark', n_jobs=3):    clf.fit(iris.data, iris.target)提高py4j.security.Py4JSecurityException: Method public int org.apache.spark.SparkContext.maxNumConcurrentTasks() is not whitelisted on class class org.apache.spark.SparkContext
查看完整描述

2 回答

?
MM们

TA贡献1886条经验 获得超2个赞

必要的要求是:

  • Python 3.6+

  • pyspark>=2.4

  • scikit-learn>=0.21

  • joblib>=0.14

我无法在运行 Python 3.7.5、Spark 3.0.0、scikit-learn 0.22.1 和 joblib 0.14.1 的社区 Databricks 集群中重现您的问题:

import sys

import sklearn

import joblib


spark.version

# '3.0.0'


sys.version

# '3.7.5 (default, Nov  7 2019, 10:50:52) \n[GCC 8.3.0]'


sklearn.__version__

# '0.22.1'


joblib.__version__

# '0.14.1'

通过上述设置,您的代码片段可以顺利运行,并确实生成一个分类器,clf如下所示:


GridSearchCV(cv=5, error_score=nan,

             estimator=SVC(C=1.0, break_ties=False, cache_size=200,

                           class_weight=None, coef0=0.0,

                           decision_function_shape='ovr', degree=3,

                           gamma='auto', kernel='rbf', max_iter=-1,

                           probability=False, random_state=None, shrinking=True,

                           tol=0.001, verbose=False),

             iid='deprecated', n_jobs=None,

             param_grid={'C': [1, 10], 'kernel': ('linear', 'rbf')},

             pre_dispatch='2*n_jobs', refit=True, return_train_score=False,

             scoring=None, verbose=0)

这里的替代示例也是如此:


from sklearn.utils import parallel_backend

from sklearn.model_selection import cross_val_score

from sklearn import datasets

from sklearn import svm

from joblibspark import register_spark


register_spark() # register spark backend


iris = datasets.load_iris()

clf = svm.SVC(kernel='linear', C=1)

with parallel_backend('spark', n_jobs=3):

  scores = cross_val_score(clf, iris.data, iris.target, cv=5)


print(scores)

给予


[0.96666667 1.         0.96666667 0.96666667 1.        ]


查看完整回答
反对 回复 2023-07-05
?
12345678_0001

TA贡献1802条经验 获得超5个赞

这个答案对于标准 Spark / Databricks 设置来说应该是正确的,因此考虑到我的问题的措辞/对其他读者的潜在有用性,我已经接受了它

发现我们案例中的问题后,贡献一个单独的“答案”:Databricks 支持人员建议我们案例中的问题是由于我们使用特殊类型的集群(在 AWS 上启用了凭证直通的高并发性)。grid.fit() 没有被列入此类集群的白名单,Databricks 建议他们需要向工程团队提出将其列入白名单。


查看完整回答
反对 回复 2023-07-05
  • 2 回答
  • 0 关注
  • 105 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信