为了账号安全,请及时绑定邮箱和手机立即绑定

具有复杂条件的Spark SQL窗口函数

具有复杂条件的Spark SQL窗口函数

收到一只叮咚 2019-09-03 15:57:44
这可能是最容易通过示例解释的。假设我有一个用户登录网站的DataFrame,例如:scala> df.show(5)+----------------+----------+|       user_name|login_date|+----------------+----------+|SirChillingtonIV|2012-01-04||Booooooo99900098|2012-01-04||Booooooo99900098|2012-01-06||  OprahWinfreyJr|2012-01-10||SirChillingtonIV|2012-01-11|+----------------+----------+only showing top 5 rows我想在此列添加一个列,指示他们何时成为网站上的活跃用户。但有一点需要注意:有一段时间用户被认为是活动的,在此期间之后,如果他们再次登录,他们的became_active日期会重置。假设这段时间是5天。然后从上表派生的所需表将是这样的:+----------------+----------+-------------+|       user_name|login_date|became_active|+----------------+----------+-------------+|SirChillingtonIV|2012-01-04|   2012-01-04||Booooooo99900098|2012-01-04|   2012-01-04||Booooooo99900098|2012-01-06|   2012-01-04||  OprahWinfreyJr|2012-01-10|   2012-01-10||SirChillingtonIV|2012-01-11|   2012-01-11|+----------------+----------+-------------+因此,特别是,SirChillingtonIV的became_active日期被重置,因为他们的第二次登录是在活动期过期之后,但是Booooooo99900098的became_active日期没有在他/她登录的第二次重置,因为它落在活动期间。我最初的想法是使用窗口函数lag,然后使用lagged值填充became_active列; 例如,大致类似于:import org.apache.spark.sql.expressions.Windowimport org.apache.spark.sql.functions._val window = Window.partitionBy("user_name").orderBy("login_date")val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))然后,规则填写became_active日期会是这样,如果tmp是null(即,如果它是第一次登录),或者如果login_date - tmp >= 5再became_active = login_date; 否则,转到下一个最近的值tmp并应用相同的规则。这表明了一种递归方法,我无法想象实现的方法。我的问题:这是一种可行的方法,如果是这样的话,我怎么能“回头”看看早期的价值观,tmp直到我找到一个停止的地方?据我所知,我无法迭代Spark SQL的值Column。还有另一种方法来实现这个结果吗?
查看完整描述

2 回答

?
呼啦一阵风

TA贡献1802条经验 获得超6个赞

重构对方的回答 与工作Pyspark


在Pyspark你可以像下面。


create data frame


df = sqlContext.createDataFrame(

[

("SirChillingtonIV", "2012-01-04"), 

("Booooooo99900098", "2012-01-04"), 

("Booooooo99900098", "2012-01-06"), 

("OprahWinfreyJr", "2012-01-10"), 

("SirChillingtonIV", "2012-01-11"), 

("SirChillingtonIV", "2012-01-14"), 

("SirChillingtonIV", "2012-08-11")

], 

("user_name", "login_date"))

上面的代码创建了一个如下所示的数据框


+----------------+----------+

|       user_name|login_date|

+----------------+----------+

|SirChillingtonIV|2012-01-04|

|Booooooo99900098|2012-01-04|

|Booooooo99900098|2012-01-06|

|  OprahWinfreyJr|2012-01-10|

|SirChillingtonIV|2012-01-11|

|SirChillingtonIV|2012-01-14|

|SirChillingtonIV|2012-08-11|

+----------------+----------+

现在我们要先发现它们之间的区别login_date是多于5几天。


对于这个,如下所示。


必要的进口


from pyspark.sql import functions as f

from pyspark.sql import Window



# defining window partitions  

login_window = Window.partitionBy("user_name").orderBy("login_date")

session_window = Window.partitionBy("user_name", "session")


session_df = df.withColumn("session", f.sum((f.coalesce(f.datediff("login_date", f.lag("login_date", 1).over(login_window)), f.lit(0)) > 5).cast("int")).over(login_window))

当我们运行上面的代码行时,如果date_diff是,NULL则coalesce函数将替换NULL为0。


+----------------+----------+-------+

|       user_name|login_date|session|

+----------------+----------+-------+

|  OprahWinfreyJr|2012-01-10|      0|

|SirChillingtonIV|2012-01-04|      0|

|SirChillingtonIV|2012-01-11|      1|

|SirChillingtonIV|2012-01-14|      1|

|SirChillingtonIV|2012-08-11|      2|

|Booooooo99900098|2012-01-04|      0|

|Booooooo99900098|2012-01-06|      0|

+----------------+----------+-------+



# add became_active column by finding the `min login_date` for each window partitionBy `user_name` and `session` created in above step

final_df = session_df.withColumn("became_active", f.min("login_date").over(session_window)).drop("session")


+----------------+----------+-------------+

|       user_name|login_date|became_active|

+----------------+----------+-------------+

|  OprahWinfreyJr|2012-01-10|   2012-01-10|

|SirChillingtonIV|2012-01-04|   2012-01-04|

|SirChillingtonIV|2012-01-11|   2012-01-11|

|SirChillingtonIV|2012-01-14|   2012-01-11|

|SirChillingtonIV|2012-08-11|   2012-08-11|

|Booooooo99900098|2012-01-04|   2012-01-04|

|Booooooo99900098|2012-01-06|   2012-01-04|

+----------------+----------+-------------+


查看完整回答
反对 回复 2019-09-03
  • 2 回答
  • 0 关注
  • 755 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信