为了账号安全,请及时绑定邮箱和手机立即绑定

pyspark 内部连接的替代方案,用于比较 pyspark 中的两个数据帧

pyspark 内部连接的替代方案,用于比较 pyspark 中的两个数据帧

白板的微信 2023-03-22 16:47:07
我在 pyspark 中有两个数据框。如下所示,df1 包含来自传感器的整个 long_lat。第二个数据帧 df2 是第一个数据帧的子集,其中 lat-long 值四舍五入为小数点后 2 位,然后删除重复项以保留唯一的 lat_long 数据点。df1:+-----------------+---------+-----+--------------------+----------+------------+|              UID|    label|value|            datetime|  latitude|   longitude|+-----------------+---------+-----+--------------------+----------+------------+|1B0545GD6546Y|evnt     | 3644|2020-06-08T23:32:...|40.1172005|-105.0823546||1B0545GD6FG67|evnt     | 3644|2020-06-08T23:32:...|40.1172201|-105.0821007||15GD6546YFG67|evnt     | 3644|2020-06-08T23:32:...|40.1172396|-105.0818468||1BGD6546YFG67|evnt     | 3644|2020-06-08T23:32:...|40.1172613|-105.0815929||1BGD6546YFG67|evnt     | 3644|2020-06-08T23:32:...|40.1172808|-105.0813368||1B054546YFG67|evnt     | 3644|2020-06-08T23:32:...|40.1173003|-105.0810742||1B056546YFG67|evnt     | 3644|2020-06-08T23:32:...| 40.117322|-105.0808073|df2:+-------+--------+----------------+--------------+                              |new_lat|new_long|        lat_long|    State_name|+-------+--------+----------------+--------------+|  40.13|  -105.1|[40.13, -105.1] |      Colorado||  40.15| -105.11|[40.15, -105.11]|      Colorado||  40.12| -105.07|[40.12, -105.07]|      Colorado||  40.13| -104.99|[40.13, -104.99]|      Colorado||  40.15| -105.09|[40.15, -105.09]|      Colorado||  40.15| -105.13|[40.15, -105.13]|      Colorado||  40.12| -104.94|[40.12, -104.94]|      Colorado|因此,df2 的行数比第一个少得多。在 df2 中,我应用了一个 udf 来计算州名称。现在我想在 df1 中填充州名称。由于 df2 的 lat_long 值四舍五入为小数点后 2 位,为了匹配我使用如下阈值,我在这里使用连接操作。threshold = 0.01df4 = df1.join(df2)\        .filter(df2.new_lat - threshold < df1.latitude)\        .filter(df1.latitude < df2.new_lat + threshold)有没有其他有效的方法来实现同样的目标?因为连接操作是做笛卡尔积,它需要时间和大量的任务。考虑一下,我的 df1 将有 10000 亿条记录。任何帮助将不胜感激。
查看完整描述

1 回答

?
慕标琳琳

TA贡献1830条经验 获得超9个赞

每当你连接一个大的 DataFrame 和一个小的 DataFrame 时,你应该总是尝试执行广播连接。


如果df2小到可以广播,那么df1.join(broadcast(df2))性能会更好。


该方法的第二个参数join()应该是连接条件。


def approx_equal(col1, col2, threshold):

    return abs(col1 - col2) < threshold


threshold = lit(0.01)


df4 = df1.join(broadcast(df2), approx_equal(df2.new_lat, df1.latitude, threshold) && approx_equal(df2.new_long, df1. longitude, threshold))

编辑:我将approx_equal函数添加到quinn,所以你的代码可以更简洁:


import quinn as Q


threshold = lit(0.01)


df4 = df1.join(broadcast(df2), Q.approx_equal(df2.new_lat, df1.latitude, threshold) && Q.approx_equal(df2.new_long, df1. longitude, threshold))



查看完整回答
反对 回复 2023-03-22
  • 1 回答
  • 0 关注
  • 85 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信