1 回答
TA贡献1895条经验 获得超7个赞
这是我尝试进行一些修改,例如,我无法理解如何存在 62 秒。
from pyspark.sql.functions import *
from pyspark.sql import Window
w = Window.orderBy('time')
df.select('id', 'time') \
.withColumn('time', to_timestamp('time', 'yyyy-MM-dd HH:mm:ss.SSS')) \
.withColumn('time2', coalesce(lead('time', 1).over(w), expr('time + interval 10 seconds'))) \
.withColumn('seq', expr("sequence(time, time2 + interval 5 seconds, interval 5 seconds)")) \
.withColumn('time', explode('seq')) \
.select('id', 'time') \
.join(df, ['id', 'time'], 'left') \
.fillna(0).show(20, False)
+---+-----------------------+-----+
|id |time |Value|
+---+-----------------------+-----+
|id1|2020-02-22 04:57:36.843|1.4 |
|id1|2020-02-22 04:57:41.843|0.0 |
|id1|2020-02-22 04:57:46.843|0.0 |
|id1|2020-02-22 04:57:51.843|0.0 |
|id2|2020-02-22 04:57:50.85 |1.7 |
|id2|2020-02-22 04:57:55.85 |0.0 |
|id2|2020-02-22 04:58:00.85 |0.0 |
|id3|2020-02-22 04:57:59.133|1.2 |
|id3|2020-02-22 04:58:04.133|0.0 |
|id3|2020-02-22 04:58:09.133|0.0 |
|id3|2020-02-22 04:58:14.133|0.0 |
+---+-----------------------+-----+
添加回答
举报