为了账号安全,请及时绑定邮箱和手机立即绑定

Spark SQL:在数组值上使用 collect_set?

Spark SQL:在数组值上使用 collect_set?

慕桂英3389331 2022-06-15 10:32:00
我有一个聚合的 DataFrame,其中有一列使用collect_set. 我现在需要再次聚合此 DataFrame,并再次应用于collect_set该列的值。问题是我需要应用collect_Set集合的值——而且我看到的唯一方法是分解聚合的 DataFrame。有没有更好的办法?例子:初始数据框:country   | continent   | attributes-------------------------------------Canada    | America     | ABelgium   | Europe      | ZUSA       | America     | ACanada    | America     | BFrance    | Europe      | YFrance    | Europe      | X聚合数据帧(我收到作为输入的那个) - 聚合country:country   | continent   | attributes-------------------------------------Canada    | America     | A, BBelgium   | Europe      | ZUSA       | America     | AFrance    | Europe      | Y, X我想要的输出 - 聚合continent:continent   | attributes-------------------------------------America     | A, BEurope      | X, Y, Z
查看完整描述

1 回答

?
哈士奇WWW

TA贡献1799条经验 获得超6个赞

由于此时您只能拥有少量行,因此您只需按原样收集属性并将结果展平(Spark >= 2.4)


import org.apache.spark.sql.functions.{collect_set, flatten, array_distinct}


val byState = Seq(

  ("Canada", "America", Seq("A", "B")),

  ("Belgium", "Europe", Seq("Z")),

  ("USA", "America", Seq("A")),

  ("France", "Europe", Seq("Y", "X"))

).toDF("country", "continent", "attributes")


byState

  .groupBy("continent")

  .agg(array_distinct(flatten(collect_set($"attributes"))) as "attributes")

  .show

+---------+----------+

|continent|attributes|

+---------+----------+

|   Europe| [Y, X, Z]|

|  America|    [A, B]|

+---------+----------+

在一般情况下,事情更难处理,并且在许多情况下,如果您期望大型列表,每个组有许多重复项和许多值,则最佳解决方案*是从头开始重新计算结果,即


input.groupBy($"continent").agg(collect_set($"attributes") as "attributes")

一种可能的替代方法是使用Aggregator


import org.apache.spark.sql.expressions.Aggregator

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder

import org.apache.spark.sql.{Encoder, Encoders}

import scala.collection.mutable.{Set => MSet}



class MergeSets[T, U](f: T => Seq[U])(implicit enc: Encoder[Seq[U]]) extends 

     Aggregator[T, MSet[U], Seq[U]] with Serializable {


  def zero = MSet.empty[U]


  def reduce(acc: MSet[U], x: T) = {

    for { v <- f(x) } acc.add(v)

    acc

  }


  def merge(acc1: MSet[U], acc2: MSet[U]) = {

    acc1 ++= acc2

  }


  def finish(acc: MSet[U]) = acc.toSeq

  def bufferEncoder: Encoder[MSet[U]] = Encoders.kryo[MSet[U]]

  def outputEncoder: Encoder[Seq[U]] = enc


}

并按如下方式应用


case class CountryAggregate(

  country: String, continent: String, attributes: Seq[String])


byState

  .as[CountryAggregate]

  .groupByKey(_.continent)

  .agg(new MergeSets[CountryAggregate, String](_.attributes).toColumn)

  .toDF("continent", "attributes")

  .show

+---------+----------+

|continent|attributes|

+---------+----------+

|   Europe| [X, Y, Z]|

|  America|    [B, A]|

+---------+----------+

但这显然不是 Java 友好的选择。


另请参阅如何在 groupBy 之后将值聚合到集合中?(类似,但没有唯一性约束)。


* 这是因为explode可能非常昂贵,尤其是在旧 Spark 版本中,与访问 SQL 集合的外部表示相同。


查看完整回答
反对 回复 2022-06-15
  • 1 回答
  • 0 关注
  • 80 浏览

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号