为了账号安全,请及时绑定邮箱和手机立即绑定

在PySpark中的GroupedData上应用UDF(具有可运行的python示例)

在PySpark中的GroupedData上应用UDF(具有可运行的python示例)

开心每一天1111 2019-12-12 14:10:42
我有在python数据帧中本地运行的以下python代码:df_result = pd.DataFrame(df                          .groupby('A')                          .apply(lambda x: myFunction(zip(x.B, x.C), x.name))我想在PySpark中运行它,但是在处理pyspark.sql.group.GroupedData对象时遇到了麻烦。我尝试了以下方法:sparkDF .groupby('A') .agg(myFunction(zip('B', 'C'), 'A')) 哪个返回KeyError: 'A'我猜想是因为“ A”不再是一列,而且我找不到x.name的等效项。接着sparkDF .groupby('A') .map(lambda row: Row(myFunction(zip('B', 'C'), 'A')))  .toDF()但出现以下错误:AttributeError: 'GroupedData' object has no attribute 'map'任何建议将不胜感激!
查看完整描述

3 回答

?
慕森王

TA贡献1777条经验 获得超3个赞

我将超越答案。


因此,您可以使用@pandas_udf在pyspark中实现类似pandas.groupby()。apply的逻辑,这是矢量化方法,并且比简单的udf更快。


from pyspark.sql.functions import pandas_udf,PandasUDFType


df3 = spark.createDataFrame(

[("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],

("key", "value1", "value2")

)


from pyspark.sql.types import *


schema = StructType([

    StructField("key", StringType()),

    StructField("avg_value1", DoubleType()),

    StructField("avg_value2", DoubleType()),

    StructField("sum_avg", DoubleType()),

    StructField("sub_avg", DoubleType())

])


@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)

def g(df):

    gr = df['key'].iloc[0]

    x = df.value1.mean()

    y = df.value2.mean()

    w = df.value1.mean() + df.value2.mean()

    z = df.value1.mean() - df.value2.mean()

    return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]])


df3.groupby("key").apply(g).show()

您将获得以下结果:


+---+----------+----------+-------+-------+

|key|avg_value1|avg_value2|sum_avg|sub_avg|

+---+----------+----------+-------+-------+

|  b|       6.5|      -1.5|    5.0|    8.0|

|  a|       0.0|      21.0|   21.0|  -21.0|

+---+----------+----------+-------+-------+

因此,您可以在分组数据中的其他字段之间进行更多计算,并将它们以列表格式添加到数据框中。



查看完整回答
反对 回复 2019-12-13
  • 3 回答
  • 0 关注
  • 500 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信