为了账号安全,请及时绑定邮箱和手机立即绑定

如何使用 Apache Beam 管理背压

如何使用 Apache Beam 管理背压

元芳怎么了 2023-06-14 16:35:51
我有非常基本的 apache beam 管道,它在 GCP Dataflow 上运行并从 PubSub 读取一些数据,对其进行转换并将其写入 Postgres 数据库。所有这些都是通过 Apache Beam 的标准读取器/写入器组件完成的。问题是当我的管道开始接收大量数据时,我的 Postgres 端由于等待 ShareLocks 而出现死锁错误。很明显,这种事情的发生是因为 Postgres 端溢出。我的管道试图一次写得太快和太多东西,所以为了避免这种情况,它应该放慢速度。因此,我们可以使用诸如背压之类的机制。我试图挖掘出有关 Apache Beam 背压配置的任何信息,不幸的是,官方文档似乎对此类问题只字未提。我对以下类型的异常感到不知所措:java.sql.BatchUpdateException: Batch entry <NUMBER><MY_STATEMENT> was aborted: ERROR: deadlock detected  Detail: Process 87768 waits for ShareLock on transaction 1939992; blocked by process 87769.Process 87769 waits for ShareLock on transaction 1939997; blocked by process 87768.  Hint: See server log for query details.  Where: while inserting index tuple (5997152,9) in relation "<MY_TABLE>"  Call getNextException to see other errors in the batch.我想知道是否有任何背压工具包或类似的东西可以帮助我在不编写自己的PostgresIO.Writer.非常感谢。
查看完整描述

1 回答

?
守着一只汪

TA贡献1872条经验 获得超3个赞

假设您使用JdbcIO写入 Postgres,您可以尝试增加批处理大小(请参阅 参考资料withBatchSize(long batchSize)),默认情况下为 1K 条记录,这可能是不够的。

此外,如果出现 SQL 异常,并且您想要重试,那么您需要确保使用正确的重试策略(参见 参考资料withRetryStrategy(RetryStrategy retryStrategy))。在这种情况下,FluentBackoff将被应用。


查看完整回答
反对 回复 2023-06-14
  • 1 回答
  • 0 关注
  • 95 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信