1 回答
TA贡献1864条经验 获得超6个赞
问题在于您如何udf在 Spark中注册。您不应将UserDefinedAggregateFunctionwhich is not an udfbut an udafused 用于聚合。相反,您应该做的是:
spark.udf().register("toVector", toVector, new VectorUDT());
然后要使用注册的函数,请使用:
df3.withColumn("featuresnew", callUDF("toVector",df3.col("feautres")));
在udf本身应稍微调整如下:
UDF1 toVector = new UDF1<Seq<Float>, Vector>(){
public Vector call(Seq<Float> t1) throws Exception {
List<Float> L = scala.collection.JavaConversions.seqAsJavaList(t1);
double[] DoubleArray = new double[t1.length()];
for (int i = 0 ; i < L.size(); i++) {
DoubleArray[i]=L.get(i);
}
return Vectors.dense(DoubleArray);
}
};
请注意,在Spark 2.3+ 中,您可以创建udf可直接调用的 Scala 样式。从这个答案:
UserDefinedFunction toVector = udf(
(Seq<Float> array) -> /* udf code or method to call */, new VectorUDT()
);
df3.withColumn("featuresnew", toVector.apply(col("feautres")));
添加回答
举报