为了账号安全,请及时绑定邮箱和手机立即绑定

如何在 PySpark 中创建 merge_asof 功能?

如何在 PySpark 中创建 merge_asof 功能?

慕虎7371278 2022-04-27 13:47:15
表A有许多列和一个日期列,表B有一个日期时间和一个值。两个表中的数据都是零星生成的,没有固定的间隔。桌子A很小,桌子B很大。我需要在给定元素对应的条件下B加入AaA.datetimeB[B['datetime'] <= a]]['datetime'].max()有几种方法可以做到这一点,但我想要最有效的方法。选项1将小数据集广播为 Pandas DataFrame。设置一个 Spark UDF,为每一行创建一个 pandas DataFrame,使用merge_asof.选项 2使用 Spark SQL 的广播连接功能:在以下条件下设置 theta 连接B['datetime'] <= A['datetime']然后消除所有多余的行。选项 B 似乎很糟糕......但请让我知道第一种方法是否有效或者是否有另一种方法。编辑:这是示例输入和预期输出:A =+---------+----------+| Column1 | Datetime |+---------+----------+|    A    |2019-02-03||    B    |2019-03-14|+---------+----------+B =+---------+----------+|   Key   | Datetime |+---------+----------+|    0    |2019-01-01||    1    |2019-01-15||    2    |2019-02-01||    3    |2019-02-15||    4    |2019-03-01||    5    |2019-03-15|+---------+----------+custom_join(A,B) =+---------+----------+| Column1 |   Key    |+---------+----------+|    A    |     2    ||    B    |     4    |+---------+----------+
查看完整描述

3 回答

?
神不在的星期二

TA贡献1963条经验 获得超6个赞

任何尝试在 pyspark 3.x 中执行此操作的人都可以使用pyspark.sql.PandasCogroupedOps.applyInPandas

例如:

  from pyspark.sql import SparkSession, Row, DataFrame

  import pandas as pd

  spark = SparkSession.builder.master("local").getOrCreate()


  df1 = spark.createDataFrame(

      [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],

      ("time", "id", "v1"))

  df2 = spark.createDataFrame(

      [(20000101, 1, "x"), (20000101, 2, "y")],

      ("time", "id", "v2"))

  def asof_join(l, r):

      return pd.merge_asof(l, r, on="time", by="id")

  df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(

      asof_join, schema="time int, id int, v1 double, v2 string"

  ).show()



  >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

  +--------+---+---+---+

  |    time| id| v1| v2|

  +--------+---+---+---+

  |20000101|  1|1.0|  x|

  |20000102|  1|3.0|  x|

  |20000101|  2|2.0|  y|

  |20000102|  2|4.0|  y|

  +--------+---+---+---+


查看完整回答
反对 回复 2022-04-27
?
慕尼黑5688855

TA贡献1848条经验 获得超2个赞

我怀疑它是否更快,但您可以通过使用union和last与window函数一起使用 Spark 来解决它。


from pyspark.sql import functions as f

from pyspark.sql.window import Window


df1 = df1.withColumn('Key', f.lit(None))

df2 = df2.withColumn('Column1', f.lit(None))


df3 = df1.unionByName(df2)


w = Window.orderBy('Datetime', 'Column1').rowsBetween(Window.unboundedPreceding, -1)

df3.withColumn('Key', f.last('Key', True).over(w)).filter(~f.isnull('Column1')).show()

这使


+-------+----------+---+

|Column1|  Datetime|Key|

+-------+----------+---+

|      A|2019-02-03|  2|

|      B|2019-03-14|  4|

+-------+----------+---+

这是一个老问题,但可能对某些人仍然有用。


查看完整回答
反对 回复 2022-04-27
?
收到一只叮咚

TA贡献1821条经验 获得超4个赞

想出了一个快速(但可能不是最有效)的方法来完成这个。我构建了一个辅助函数:


def get_close_record(df, key_column, datetime_column, record_time):

    """

    Takes in ordered dataframe and returns the closest 

    record that is higher than the datetime given.

    """

    filtered_df = df[df[datetime_column] >= record_time][0:1]

    [key] = filtered_df[key_column].values.tolist()

    return key

我没有加入B,A而是设置了pandas_udf上述代码并在表的列上运行它,然后B使用主键运行并由聚合。groupByBA_keyB_keymax


这种方法的问题是它需要在B.


更好的解决方案:


我开发了以下应该可以工作的辅助函数


other_df['_0'] = other_df['Datetime']

bdf = sc.broadcast(other_df)


#merge asof udf

@F.pandas_udf('long')

def join_asof(v, other=bdf.value):

    f = pd.DataFrame(v)

    j = pd.merge_asof(f, other, on='_0', direction = 'forward')

    return j['Key']


joined = df.withColumn('Key', join_asof(F.col('Datetime')))


查看完整回答
反对 回复 2022-04-27
  • 3 回答
  • 0 关注
  • 131 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号