我正在尝试创建一个将另一个函数作为参数的 UDF。但是执行以异常结束。我运行的代码:import pandas as pdfrom pyspark import SparkConf, SparkContext, SQLContextfrom pyspark.sql.types import MapType, DataType, StringTypefrom pyspark.sql.functions import udf, struct, litimport ossc = SparkContext.getOrCreate(conf=conf)sqlContext = SQLContext(sc)df_to_test = sqlContext.createDataFrame( pd.DataFrame({ 'inn': ['111', '222', '333'], 'field1': [1, 2, 3], 'field2': ['a', 'b', 'c'] }))def foo_fun(row, b) -> str: return 'a' + b()def bar_fun(): return 'I am bar'foo_fun_udf = udf(foo_fun, StringType())df_to_test.withColumn( 'foo', foo_fun_udf(struct([df_to_test[x] for x in df_to_test.columns]), bar_fun)).show()例外:Invalid argument, not a string or column: <function bar_fun at 0x7f0e69ce6268> of type <class 'function'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.我试图包装bar_fun成 udf 但没有成功。有没有办法将函数作为参数传递?
1 回答
墨色风雨
TA贡献1853条经验 获得超6个赞
你离解决方案不远了。这是我会怎么做:
def foo_fun_udf(func):
def foo_fun(row) -> str:
return 'a' + func()
out_udf = udf(foo_fun, StringType())
return out_udf
df_to_test.withColumn(
'foo',
foo_fun_udf(bar_fun)(struct([df_to_test[x] for x in df_to_test.columns]))
).show()
添加回答
举报
0/150
提交
取消