我需要为 spark 中数据集的每一行调用一个休息服务。我生成了以下代码:import requestsdf= spark.read.parquet("file.parquet")for row in df.rdd.collect(): requests.post('rest.api/endpoint')我不确定这是否是最好的方法,性能方面。有没有更好的方法来实现它?
1 回答
慕丝7291255
TA贡献1859条经验 获得超6个赞
通过在结果上运行它,.collect您将失去任何并行化,并且所有请求都将由驱动程序完成。
您可以创建一个为每一行调用 API 的 UDF:
from pyspark.sql.functions import udf
import requests
api = "https://swapi.co/api/people/"
@udf("string")
def swapiGetPersonName(id):
response = requests.get(api + str(id))
return response.json()["name"]
df = spark.range(1,10)
df.select("id", swapiGetPersonName("id").alias("name")).show()
但是,如果您有大量数据,这很容易使您的休息服务或执行程序超载。(您几乎会对您的服务进行拒绝服务攻击或用完套接字)。如果这是一个问题,您可以
通过在时间加载数据的子集来批处理数据
通过拆分它
foreachPartition
并在每个行中一个接一个地处理它来批处理它使用结构化流传输数据并限制您一次处理的行数
使用支持批处理操作的API(或修改你的)(而不是每一行,上传整个分区/重要部分数据)
这些都是我的想法,但是除了对服务进行太多调用之外,不要忘记添加适当的异常处理:)
添加回答
举报
0/150
提交
取消