1 回答
TA贡献1963条经验 获得超6个赞
使用 PySpark 解决了这个问题:
我首先创建了另一个列,其中包含每个用户的所有到期日期数组:
joined_array = df.groupBy('user_id').agg(collect_set('expiration_date'))
然后将该数组加入到原始数据帧中:
joined_array = joined_array.toDF('user_idDROP', 'expiration_date_array')
df = df.join(joined_array, df.user_id == joined_array.user_idDROP, how = 'left').drop('user_idDROP')
然后创建一个函数来遍历数组,如果创建日期大于到期日期,则将计数加 1:
def check_expiration_count(created_at, expiration_array):
if not expiration_array:
return 0
else:
count = 0
for i in expiration_array:
if created_at > i:
count += 1
return count
check_expiration_count = udf(check_expiration_count, IntegerType())
然后应用该函数创建一个具有正确计数的新列:
df = df.withColumn('count_of_subs_ending_before_creation', check_expiration_count(df.created_at, df.expiration_array))
瓦拉。完毕。谢谢大家(没有人帮忙,但还是谢谢)。希望有人在 2022 年发现这很有用
添加回答
举报