2 回答
TA贡献1797条经验 获得超6个赞
您可以在不使用in 的情况下实现这一点self-join(因为连接shuffle在大数据中是昂贵的操作)。使用的功能。higher order functionsspark 2.4filter,transform,aggregate
df=spark.createDataFrame(data)
x = 2
win = Window().partitionBy('customerid').orderBy(F.col("date").cast("long")).rangeBetween(-(86400*x), Window.currentRow)
test = df.withColumn("productids", F.array_distinct(F.split("productids", "\;")))\
.withColumn("flat_col", F.flatten(F.collect_list("productids").over(win)))\
.withColumn("occurances", F.expr("""filter(transform(productids, x->\
IF(aggregate(flat_col, 0,(acc,t)->acc+IF(t=x,1,0))>1,x,null)),y->y!='null')"""))\
.drop("flat_col").orderBy("date").show()
+-------------------+----------+----------+----------+
| date|customerid|productids|occurances|
+-------------------+----------+----------+----------+
|2014-01-01 00:00:00| 1| [A, B]| []|
|2014-01-02 00:00:00| 2| [D, E]| []|
|2014-01-03 00:00:00| 2| [H, X]| []|
|2014-01-04 00:00:00| 3| [P, Q, G]| []|
|2014-01-05 00:00:00| 4| [S, T, U]| []|
|2014-01-06 00:00:00| 3| [C, G]| [G]|
+-------------------+----------+----------+----------+
TA贡献1856条经验 获得超11个赞
自加入是有史以来最好的把戏
from pyspark.sql.functions import concat_ws, collect_list
spark.createDataFrame(data).registerTempTable("df")
sql("SELECT date, customerid, explode(split(productids, ';')) productid FROM df").registerTempTable("altered")
df = sql("SELECT al.date, al.customerid, al.productid productids, altr.productid flat_col FROM altered al left join altered altr on altr.customerid = al.customerid and al.productid = altr.productid and al.date != altr.date and datediff(al.date,altr.date) <=2 and datediff(al.date,altr.date) >=-2")
df.groupBy("date", "customerid").agg(concat_ws(",", collect_list("productids")).alias('productids'), concat_ws(",", collect_list("flat_col")).alias('flat_col')).show()
火花输出
添加回答
举报