为了账号安全,请及时绑定邮箱和手机立即绑定

有没有办法在 UDF 中添加新列(在 java spark 中)

有没有办法在 UDF 中添加新列(在 java spark 中)

繁星淼淼 2023-05-24 14:59:59
我有一个 spark 数据集的列(在 java 中),我希望该列的所有值成为新列的列名(新列可以用常量值填充)。For example I have:+------------+|    Column  | +------------+| a          | | b          || c          |+------------+And I want: +------+----+----+---+|Column| a  |  b | c |+------+----+----+---+| a    | 0  | 0  |0  || b    | 0  | 0  |0  || c    | 0  | 0  |0  |+------+----+----+---+我试过的是:public class test{    static SparkSession spark = SparkSession.builder().appName("Java")            .config("spark.master", "local").getOrCreate();    static Dataset<Row> dataset = spark.emptyDataFrame();    public Dataset<Row> test(Dataset<Row> ds, SparkSession spark) {        SQLContext sqlContext = new SQLContext(spark);        sqlContext.udf().register("add", add, DataTypes.createArrayType(DataTypes.StringType));        ds = ds.withColumn("substrings", functions.callUDF("add", ds.col("Column")));        return ds;    }    private static UDF1 addSubstrings = new UDF1<String, String[]>() {        public String[] call(String str) throws Exception {            dataset = dataset.withColumn(str, functions.lit(0));            String[] a = {"placeholder"};            return a;        }    };}我的问题是,有时我得到正确的结果,有时却没有(未添加列)。我真的不明白为什么。我正在寻找一种将数据集传递给 UDF 的方法,但我不知道该怎么做。目前我正在通过使用列的 collectAsList() 来解决它,然后迭代 Arraylist 从而添加新列。但这真的很低效,因为我的数据太多了。
查看完整描述

3 回答

?
动漫人物

TA贡献1815条经验 获得超10个赞

对于这个用例,您可以使用pivot

ds

 .withColumn("pivot_column", $"first_column")

 .groupBy($"first_column")

 .pivot("pivot_column")

 .count

如果你想要更好的性能,你可能想在 pivot 中提供可能的值,比如pivot("pivot_column", Seq("a", "b", "c"))


我用于count聚合,但你可以进行任何你想要的聚合。


From

+------------+

|first_column| 

+------------+

| a          | 

| b          |

| c          |

+------------+


To


+------------+---+---+---+

|first_column| a | b | c |

+------------+---+---+---+

| a          | 1 | 0 | 0 |

| b          | 0 | 1 | 0 |

| c          | 0 | 0 | 1 |

+------------+---+---+---+


查看完整回答
反对 回复 2023-05-24
?
慕桂英546537

TA贡献1848条经验 获得超10个赞

如果的值Column是最小的/更少,请尝试下面的代码。


df.show

+------+

|Column|

+------+

|     A|

|     B|

|     C|

+------+


// If you have multiple columns are exist, select only required column

val names = df.select($"Column").as[String].collect 

val df1 = names.foldLeft(df)((df,n) => df.withColumn(n, lit(0)))

df1.show()

+------+---+---+---+

|Column|  A|  B|  C|

+------+---+---+---+

|     A|  0|  0|  0|

|     B|  0|  0|  0|

|     C|  0|  0|  0|

+------+---+---+---+


查看完整回答
反对 回复 2023-05-24
?
蓝山帝景

TA贡献1843条经验 获得超7个赞

我认为 Spark 的本质(更准确地说,它的并行性)不允许您使用 UDF 实现您的目标。

执行查询时,Spark 将数据分发给执行器,每个执行器都有自己的行块。每个行块都有自己的列的可能值列表Column。因此,每个执行者都会尝试添加自己的列列表,这与其他执行者所做的不同。因此,当驱动程序尝试合并来自不同执行程序的结果集时,它会失败(或者执行程序可能会失败)。

collectAsList确实解决了你的问题,虽然效率很低。

此外,您可以猜测列数并发明一些函数(适合您的实际数据)来将列获得的值映射Column到这些数字 - 这样您就可以使每个执行程序的列集保持相等。该解决方案不是很通用,但可以解决某些情况。即,您会得到像这样的列:<c01, c02, c03, ..., cNN>


查看完整回答
反对 回复 2023-05-24
  • 3 回答
  • 0 关注
  • 169 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信