为了账号安全,请及时绑定邮箱和手机立即绑定

使用 Java 在 Spark DataFrame 中将数组转换为 DenseVector

使用 Java 在 Spark DataFrame 中将数组转换为 DenseVector

Smart猫小萌 2021-12-01 16:24:57
我正在运行 Spark 2.3。我想将features以下 DataFrame 中的列从ArrayType转换为DenseVector. 我在 Java 中使用 Spark。+---+--------------------+| id|            features|+---+--------------------+|  0|[4.191401, -1.793...|| 10|[-0.5674514, -1.3...|| 20|[0.735613, -0.026...|| 30|[-0.030161237, 0....|| 40|[-0.038345724, -0...|+---+--------------------+root |-- id: integer (nullable = false) |-- features: array (nullable = true) |    |-- element: float (containsNull = false)我写了以下内容,UDF但似乎不起作用:private static UDF1 toVector = new UDF1<Float[], Vector>() {    private static final long serialVersionUID = 1L;    @Override    public Vector call(Float[] t1) throws Exception {        double[] DoubleArray = new double[t1.length];        for (int i = 0 ; i < t1.length; i++)        {            DoubleArray[i] = (double) t1[i];        }       Vector vector = (org.apache.spark.mllib.linalg.Vector) Vectors.dense(DoubleArray);    return vector;    }}我希望将以下特征提取为向量,以便对其进行聚类。我也在注册 UDF,然后继续调用它,如下所示:spark.udf().register("toVector", (UserDefinedAggregateFunction) toVector);df3 = df3.withColumn("featuresnew", callUDF("toVector", df3.col("feautres")));df3.show();  在运行此代码段时,我面临以下错误:ReadProcessData$1 不能转换为 org.apache.spark.sql.expressions。用户定义的聚合函数
查看完整描述

1 回答

?
慕尼黑的夜晚无繁华

TA贡献1864条经验 获得超6个赞

问题在于您如何udf在 Spark中注册。您不应将UserDefinedAggregateFunctionwhich is not an udfbut an udafused 用于聚合。相反,您应该做的是:


spark.udf().register("toVector", toVector, new VectorUDT());

然后要使用注册的函数,请使用:


df3.withColumn("featuresnew", callUDF("toVector",df3.col("feautres")));

在udf本身应稍微调整如下:


UDF1 toVector = new UDF1<Seq<Float>, Vector>(){


  public Vector call(Seq<Float> t1) throws Exception {


    List<Float> L = scala.collection.JavaConversions.seqAsJavaList(t1);

    double[] DoubleArray = new double[t1.length()]; 

    for (int i = 0 ; i < L.size(); i++) { 

      DoubleArray[i]=L.get(i); 

    } 

    return Vectors.dense(DoubleArray); 

  } 

};

请注意,在Spark 2.3+ 中,您可以创建udf可直接调用的 Scala 样式。从这个答案:


UserDefinedFunction toVector = udf(

  (Seq<Float> array) -> /* udf code or method to call */, new VectorUDT()

);


df3.withColumn("featuresnew", toVector.apply(col("feautres")));


查看完整回答
反对 回复 2021-12-01
  • 1 回答
  • 0 关注
  • 342 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信