2 回答
![?](http://img1.sycdn.imooc.com/5458453d0001cd0102200220-100-100.jpg)
TA贡献1772条经验 获得超5个赞
IIUC,您可以使用窗口函数查找max(IF(TP=1, Date, NULL))每个id,然后按此阈值进行过滤:
from pyspark.sql import Window, functions as F
w1 = Window.partitionBy('id')
df_new = df.withColumn('Date', F.to_timestamp('Date', 'yyyy-MM-dd HH:mm')) \
.withColumn('threshhold_date', F.expr("max(IF(TP=1, Date, NULL))").over(w1)) \
.filter('Date <= threshhold_date + interval 2 days')
df_new.show()
+---+----+-------------------+-------------------+
| id| TP| Date| threshhold_date|
+---+----+-------------------+-------------------+
| A9|Null|2010-05-05 12:00:00|2010-05-06 13:00:00|
| A9| 1|2010-05-05 13:00:00|2010-05-06 13:00:00|
| A9| 1|2010-05-06 13:00:00|2010-05-06 13:00:00|
| A1|Null|2010-01-01 12:00:00|2010-01-02 01:00:00|
| A1|Null|2010-01-01 13:00:00|2010-01-02 01:00:00|
| A1| 1|2010-01-02 01:00:00|2010-01-02 01:00:00|
| A1|Null|2010-01-02 02:00:00|2010-01-02 01:00:00|
+---+----+-------------------+-------------------+
![?](http://img1.sycdn.imooc.com/545861f00001be3402200220-100-100.jpg)
TA贡献2037条经验 获得超6个赞
您可以简单地过滤数据帧TP = 1, 并使用collect()[0]来获取列的最大值Date作为变量。
使用以下命令向该变量添加 48 小时timedelta并过滤df:
from pyspark.sql.functions import *
from datetime import timedelta
date_var = df.filter(col("TP")==1).orderBy("date", ascending=False)\
.collect()[0]["date"] + timedelta(hours=48)
df.filter(col("Date")<=date_var).show()
添加回答
举报