为了账号安全,请及时绑定邮箱和手机立即绑定

需要在 Spark 中优化对休息服务的调用

需要在 Spark 中优化对休息服务的调用

小怪兽爱吃肉 2022-06-14 10:48:20
我需要为 spark 中数据集的每一行调用一个休息服务。我生成了以下代码:import requestsdf= spark.read.parquet("file.parquet")for row in df.rdd.collect():  requests.post('rest.api/endpoint')我不确定这是否是最好的方法,性能方面。有没有更好的方法来实现它?
查看完整描述

1 回答

?
慕丝7291255

TA贡献1859条经验 获得超6个赞

通过在结果上运行它,.collect您将失去任何并行化,并且所有请求都将由驱动程序完成。


您可以创建一个为每一行调用 API 的 UDF:


from pyspark.sql.functions import udf

import requests


api = "https://swapi.co/api/people/"


@udf("string")

def swapiGetPersonName(id):

    response = requests.get(api + str(id))

    return response.json()["name"]


df = spark.range(1,10)

df.select("id", swapiGetPersonName("id").alias("name")).show()

但是,如果您有大量数据,这很容易使您的休息服务或执行程序超载。(您几乎会对您的服务进行拒绝服务攻击或用完套接字)。如果这是一个问题,您可以

  • 通过在时间加载数据的子集来批处理数据

  • 通过拆分它foreachPartition并在每个行中一个接一个地处理它来批处理它

  • 使用结构化流传输数据并限制您一次处理的行数

  • 使用支持批处理操作的API(或修改你的)(而不是每一行,上传整个分区/重要部分数据)

这些都是我的想法,但是除了对服务进行太多调用之外,不要忘记添加适当的异常处理:)


查看完整回答
反对 回复 2022-06-14
  • 1 回答
  • 0 关注
  • 74 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信