为了账号安全,请及时绑定邮箱和手机立即绑定

列 A 和 B 之间的流差由列 C 和 D 聚合

列 A 和 B 之间的流差由列 C 和 D 聚合

潇潇雨雨 2022-08-16 16:16:53
如何流式传输到表中:按列 C 和 D 聚合的列 A 和 B 之间的差值。+-------------+-------------------+--+-| Column_A|Column_B |Column_C|Column_D|+-------------+-------------------+--+-|52       |67       |boy     |car     ||44       |25       |girl    |bike    ||98       |85       |boy     |car     ||52       |41       |girl    |car     |+-------------+-------------------+--+-这是我的尝试,但它不起作用:difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C")differenceStream = difference.writeStream\  .queryName("diff_aggr")\  .format("memory").outputMode("append")\  .start()我收到此错误:“GroupedData”对象没有属性“writeStream”
查看完整描述

1 回答

?
森林海

TA贡献2011条经验 获得超2个赞

根据您希望如何聚合分组数据 - 您可以执行例如


先决条件(如果您尚未设置它们):


from pyspark.sql import functions as F 

from pyspark.sql.functions import *

为:sum


difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C").agg(F.sum(F.col("Difference")).alias("Difference"))

为:max


difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C").agg(F.max(F.col("Difference")).alias("Difference"))

然后:


differenceStream = difference.writeStream\

  .queryName("diff_aggr")\

  .format("memory").outputMode("append")\

  .start()

关键是 - 如果你这样做,你也需要通过聚合来减少。如果你想把你的值排序在一起,试试groupBydf.sort(...)


查看完整回答
反对 回复 2022-08-16
  • 1 回答
  • 0 关注
  • 82 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信