2 回答
TA贡献1875条经验 获得超5个赞
错误“处理卡住...”表示某些特定操作花费的时间超过 5m,而不是作业永久卡住。但是,由于步骤 FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles 是卡住并且作业被取消/终止的步骤,所以我会在作业写入临时文件时考虑一个问题。
我发现了与用于写入临时文件的第二粒度时间戳 (yyyy-MM-dd_HH-mm-ss) 相关的BEAM-7689问题。发生这种情况是因为多个并发作业可以共享同一个临时目录,这可能导致其中一个作业在其他作业完成之前将其删除。
根据之前的链接,为缓解此问题,请升级到 SDK 2.14。并让我们知道错误是否消失。
TA贡献1951条经验 获得超3个赞
自从发布这个问题以来,我已经优化了数据流作业以避开瓶颈并增加并行化。就像 rsantiago 解释的那样,处理卡住不是错误,而只是数据流传达的一种方式,即一个步骤比其他步骤花费的时间要长得多,这本质上是一个瓶颈,无法用给定的资源清除。我所做的更改似乎已经解决了这些问题。新代码如下:
public void streamData() {
try {
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
.apply(options.getWindowDuration() + " Window",
Window.<PubsubMessage>into(FixedWindows.of(parseDuration(options.getWindowDuration())))
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(parseDuration("24h")))
.apply(FileIO.<String,PubsubMessage>writeDynamic()
.by(new datePartition(options.getOutputFilenamePrefix()))
.via(Contextful.fn(
(SerializableFunction<PubsubMessage, String>) inputMsg -> new String(inputMsg.getPayload(), StandardCharsets.UTF_8)),
TextIO.sink())
.withDestinationCoder(StringUtf8Coder.of())
.to(options.getOutputDirectory())
.withNaming(type -> new CrowdStrikeFileNaming(type))
.withNumShards(options.getNumShards())
.withTempDirectory(options.getTempLocation()));
pipeline.run();
}
catch(Exception e) {
LOG.error("Unable to deploy pipeline");
LOG.error(e.toString(), e);
}
}
最大的变化涉及删除 extractMsg() 函数并将分区更改为仅使用元数据。这两个步骤都强制对消息进行反序列化/重新序列化并严重影响性能。
此外,由于我的数据集是无限的,我必须设置一个非零数量的分片。我想简化我的文件命名策略,所以我将它设置为 1 却不知道它对性能的影响有多大。从那时起,我为我的工作找到了工人/碎片/机器类型的良好平衡(不幸的是,主要基于猜测和检查)。
尽管在足够大的数据负载下仍然可能观察到瓶颈,但尽管负载很重(每天 3-5 tb),管道仍然表现良好。这些更改还显着改善了自动缩放,但我不知道为什么。数据流作业现在对峰值和谷值的反应要快得多。
添加回答
举报