1 回答
TA贡献2011条经验 获得超2个赞
根据您希望如何聚合分组数据 - 您可以执行例如
先决条件(如果您尚未设置它们):
from pyspark.sql import functions as F
from pyspark.sql.functions import *
为:sum
difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C").agg(F.sum(F.col("Difference")).alias("Difference"))
为:max
difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C").agg(F.max(F.col("Difference")).alias("Difference"))
然后:
differenceStream = difference.writeStream\
.queryName("diff_aggr")\
.format("memory").outputMode("append")\
.start()
关键是 - 如果你这样做,你也需要通过聚合来减少。如果你想把你的值排序在一起,试试groupBydf.sort(...)
添加回答
举报