为了账号安全,请及时绑定邮箱和手机立即绑定

PySpark:如何将 Python UDF 应用于 PySpark DataFrame 列?

PySpark:如何将 Python UDF 应用于 PySpark DataFrame 列?

红糖糍粑 2022-07-19 16:41:28
我有一个带有两组纬度、经度坐标的 PySpark DataFrame。我正在尝试计算给定行的每组坐标之间的 Haversine 距离。我正在使用haversine()我在网上找到的以下内容。问题是它不能应用于列,或者至少我不知道这样做的语法。有人可以分享语法或指出更好的解决方案吗?from math import radians, cos, sin, asin, sqrtdef haversine(lat1, lon1, lat2, lon2):    """    Calculate the great circle distance between two points     on the earth (specified in decimal degrees)    """    # convert decimal degrees to radians     lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])    # haversine formula     dlon = lon2 - lon1     dlat = lat2 - lat1     a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2    c = 2 * asin(sqrt(a))     # Radius of earth in miles is 3,963; 5280 ft in 1 mile    ft = 3963 * 5280 * c    return ft我知道haversine()上面的函数有效,因为我使用数据框中的一些纬度/经度坐标对其进行了测试,并得到了合理的结果:haversine(-85.8059, 38.250134,           -85.805122, 38.250098)284.1302325439314当我在 PySpark 数据框中将示例坐标替换为对应于纬度/经度的列名时,我收到错误消息。我尝试了以下代码,试图创建一个新列,其中包含计算的Haversine 距离(以英尺为单位):df.select('id', 'p1_longitude', 'p1_latitude', 'p2_lon', 'p2_lat').withColumn('haversine_dist',                            haversine(df['p1_latitude'],                                    df['p1_longitude'],                                    df['p2_lat'],                                    df['p2_lon'])).show()但我得到了错误:必须是实数,而不是 Column Traceback(最近调用最后一次):文件“”,第 8 行,haversine TypeError:必须是实数,而不是 Column这向我表明我必须以某种方式迭代地将我的 hasrsine 函数应用于我的 PySpark DataFrame 的每一行,但我不确定这个猜测是否正确,即使是这样,我也不知道该怎么做。顺便说一句,我的纬度/经度是浮点类型。
查看完整描述

1 回答

?
qq_遁去的一_1

TA贡献1725条经验 获得超7个赞

当您可以使用 Spark 内置函数时,不要使用 UDF,因为它们通常性能较差。


这是一个仅使用与您的函数相同的 Spark SQL 函数的解决方案:


from pyspark.sql.functions import col, radians, asin, sin, sqrt, cos


df.withColumn("dlon", radians(col("p2_lon")) - radians(col("p1_longitude"))) \

  .withColumn("dlat", radians(col("p2_lat")) - radians(col("p1_latitude"))) \

  .withColumn("haversine_dist", asin(sqrt(

                                         sin(col("dlat") / 2) ** 2 + cos(radians(col("p1_latitude")))

                                         * cos(radians(col("p2_lat"))) * sin(col("dlon") / 2) ** 2

                                         )

                                    ) * 2 * 3963 * 5280) \

  .drop("dlon", "dlat")\

  .show(truncate=False)

给出:


+-----------+------------+----------+---------+------------------+

|p1_latitude|p1_longitude|p2_lat    |p2_lon   |haversine_dist    |

+-----------+------------+----------+---------+------------------+

|-85.8059   |38.250134   |-85.805122|38.250098|284.13023254857814|

+-----------+------------+----------+---------+------------------+

您可以在此处找到可用的 Spark 内置函数。


查看完整回答
反对 回复 2022-07-19
  • 1 回答
  • 0 关注
  • 89 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信