3 回答

TA贡献1963条经验 获得超6个赞
任何尝试在 pyspark 3.x 中执行此操作的人都可以使用pyspark.sql.PandasCogroupedOps.applyInPandas
例如:
from pyspark.sql import SparkSession, Row, DataFrame
import pandas as pd
spark = SparkSession.builder.master("local").getOrCreate()
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string"
).show()
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
+--------+---+---+---+
| time| id| v1| v2|
+--------+---+---+---+
|20000101| 1|1.0| x|
|20000102| 1|3.0| x|
|20000101| 2|2.0| y|
|20000102| 2|4.0| y|
+--------+---+---+---+

TA贡献1848条经验 获得超2个赞
我怀疑它是否更快,但您可以通过使用union和last与window函数一起使用 Spark 来解决它。
from pyspark.sql import functions as f
from pyspark.sql.window import Window
df1 = df1.withColumn('Key', f.lit(None))
df2 = df2.withColumn('Column1', f.lit(None))
df3 = df1.unionByName(df2)
w = Window.orderBy('Datetime', 'Column1').rowsBetween(Window.unboundedPreceding, -1)
df3.withColumn('Key', f.last('Key', True).over(w)).filter(~f.isnull('Column1')).show()
这使
+-------+----------+---+
|Column1| Datetime|Key|
+-------+----------+---+
| A|2019-02-03| 2|
| B|2019-03-14| 4|
+-------+----------+---+
这是一个老问题,但可能对某些人仍然有用。

TA贡献1821条经验 获得超4个赞
想出了一个快速(但可能不是最有效)的方法来完成这个。我构建了一个辅助函数:
def get_close_record(df, key_column, datetime_column, record_time):
"""
Takes in ordered dataframe and returns the closest
record that is higher than the datetime given.
"""
filtered_df = df[df[datetime_column] >= record_time][0:1]
[key] = filtered_df[key_column].values.tolist()
return key
我没有加入B,A而是设置了pandas_udf上述代码并在表的列上运行它,然后B使用主键运行并由聚合。groupByBA_keyB_keymax
这种方法的问题是它需要在B.
更好的解决方案:
我开发了以下应该可以工作的辅助函数
other_df['_0'] = other_df['Datetime']
bdf = sc.broadcast(other_df)
#merge asof udf
@F.pandas_udf('long')
def join_asof(v, other=bdf.value):
f = pd.DataFrame(v)
j = pd.merge_asof(f, other, on='_0', direction = 'forward')
return j['Key']
joined = df.withColumn('Key', join_asof(F.col('Datetime')))
添加回答
举报