我已经使用beam SDK 在python 中编写了一个流式Google Dataflow 管道。有关于我如何在本地-runner运行它并设置标志以在 Dataflow 上运行它的文档。我现在正在尝试将其自动部署到 CI 管道(bitbucket 管道,但并不真正相关)。有关于如何“运行”管道的文档,但并没有真正“部署”它。我测试过的命令如下所示:python -m dataflow --runner "DataflowRunner" \ --jobName "<jobName>" \ --topic "<pub-sub-topic"" \ --project "<project>" \ --dataset "<dataset>" \ --worker_machine_type "n1-standard-2" \ --temp_location "gs://<bucket-name>/tmp/"这将运行作业,但由于它是流式传输,因此永远不会返回。它还在内部管理包装和推送到存储桶。我知道如果我终止该进程,它会继续运行,但是在 CI 服务器上设置它,我可以检测该进程是否实际成功,或者我只是在超时后将其终止,这很困难。这看起来很荒谬,就像我遗漏了一些明显的东西,但是我如何以一种我可以可靠地知道它从 CI 管道部署的方式在数据流上打包和运行这个模块?
2 回答

肥皂起泡泡
TA贡献1829条经验 获得超6个赞
所以是的,这很愚蠢。
基本上当你使用
with beam.Pipeline(options=options) as p:
语法,在引擎盖下它调用wait_until_finish。所以在我没有意识到的情况下调用了等待,导致它永远存在。重构以删除上下文管理器解决了这个问题。
添加回答
举报
0/150
提交
取消