为了账号安全,请及时绑定邮箱和手机立即绑定

按 Pyspark 数据框分组和过滤

按 Pyspark 数据框分组和过滤

MYYA 2022-06-14 10:05:00
我有一个 3 列的 PySpark 数据框架。有些行在 2 列中相似,但在第三列中不相似,请参见下面的示例。----------------------------------------first_name | last_name | requests_ID    |----------------------------------------Joe        | Smith     |[2,3]           |---------------------------------------- Joe        | Smith     |[2,3,5,6]       |---------------------------------------- Jim        | Bush      |[9,7]           |---------------------------------------- Jim        | Bush      |[21]            |---------------------------------------- Sarah      | Wood      |[2,3]           |----------------------------------------   我想根据 {first_name, last_name} 列对行进行分组,并且只有 {requests_ID} 数量最多的行。所以结果应该是:----------------------------------------first_name | last_name | requests_ID    |----------------------------------------Joe        | Smith     |[2,3,5,6]       |---------------------------------------- Jim        | Bush      |[9,7]           |---------------------------------------- Sarah      | Wood      |[2,3]           |---------------------------------------- 我尝试了以下不同的事情,但它为我提供了 group-by 中两行的嵌套数组,而不是最长的。gr_df = filtered_df.groupBy("first_name", "last_name").agg(F.collect_set("requests_ID").alias("requests_ID")) 这是我得到的结果:----------------------------------------first_name | last_name | requests_ID    |----------------------------------------Joe        | Smith     |[[9,7],[2,3,5,6]]|---------------------------------------- Jim        | Bush      |[[9,7],[21]]    |---------------------------------------- Sarah      | Wood      |[2,3]           |---------------------------------------- 
查看完整描述

2 回答

?
呼唤远方

TA贡献1856条经验 获得超11个赞

您可以使用size来确定数组列的长度和如下用途window:


导入并创建示例 DataFrame


import pyspark.sql.functions as f

from pyspark.sql.window import Window


df = spark.createDataFrame([('Joe','Smith',[2,3]),

('Joe','Smith',[2,3,5,6]),

('Jim','Bush',[9,7]),

('Jim','Bush',[21]),

('Sarah','Wood',[2,3])], ('first_name','last_name','requests_ID'))

定义窗口以requests_ID根据列的长度以降序获取列的行号。


在这里,f.size("requests_ID")将给出requests_ID列的长度并按desc()降序对其进行排序。


w_spec = Window().partitionBy("first_name", "last_name").orderBy(f.size("requests_ID").desc())

应用窗口函数并获取第一行。


df.withColumn("rn", f.row_number().over(w_spec)).where("rn ==1").drop("rn").show()

+----------+---------+------------+

|first_name|last_name| requests_ID|

+----------+---------+------------+

|       Jim|     Bush|      [9, 7]|

|     Sarah|     Wood|      [2, 3]|

|       Joe|    Smith|[2, 3, 5, 6]|

+----------+---------+------------+


查看完整回答
反对 回复 2022-06-14
?
哈士奇WWW

TA贡献1799条经验 获得超6个赞

要完成您当前的 df 看起来像这样,


----------------------------------------

first_name | last_name | requests_ID    |

----------------------------------------

Joe        | Smith     |[[9,7],[2,3,5,6]]|

---------------------------------------- 

Jim        | Bush      |[[9,7],[21]]    |

---------------------------------------- 

Sarah      | Wood      |[2,3]           |

---------------------------------------- 

尝试这个,


import pyspark.sql.functions as F

from pyspark.sql.types import IntegerType, ArrayType


def myfunc(x):

  temp = []

  for _ in x:

    temp.append(len(x))


  max_ind = temp.index(max(temp))


  return x[max_ind]


udf_extract = F.udf(myfunc, ArrayType(IntegerType()))


df = df.withColumn('new_requests_ID', udf_extract('requests_ID'))


#df.show()

或者,没有变量声明,


import pyspark.sql.functions as F


@F.udf

def myfunc(x):

  temp = []

  for _ in x:

    temp.append(len(x))


  max_ind = temp.index(max(temp))


  return x[max_ind]


df = df.withColumn('new_requests_ID', myfunc('requests_ID'))


#df.show()


查看完整回答
反对 回复 2022-06-14
  • 2 回答
  • 0 关注
  • 96 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信