为了账号安全,请及时绑定邮箱和手机立即绑定

数据流中的动态目标问题

数据流中的动态目标问题

繁星点点滴滴 2022-10-12 15:53:18
我有一个 Dataflow 作业,它从 pubsub 读取数据并根据时间和文件名将内容写入 GCS,其中文件夹路径基于 YYYY/MM/DD。这允许根据日期在文件夹中生成文件,并使用 apache beamFileIO和Dynamic Destinations.大约两周前,我注意到未确认消息的异常堆积。重新启动 df 作业后,错误消失了,新文件正在 GCS 中写入。几天后,写入再次停止,除了这一次,有错误声称处理被卡住了。经过一些可靠的 SO 研究后,我发现这可能是由于 Beam 2.90 之前的死锁问题造成的,因为它使用 Conscrypt 库作为默认安全提供程序。所以,我从 Beam 2.8 升级到 Beam 2.11。再一次,它起作用了,直到它没有。我更仔细地查看了这个错误,并注意到它有一个 SimpleDateFormat 对象的问题,它不是线程安全的。因此,我转而使用线程安全的 Java.time 和 DateTimeFormatter。它一直有效,直到它没有。但是,这一次,错误略有不同,并没有指向我的代码中的任何内容:错误如下所示。Processing stuck in step FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles for at least 05m00s without outputting or completing in state process  at sun.misc.Unsafe.park(Native Method)  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)  at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:469)  at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:76)  at org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub.getStateData(MetricTrackingWindmillServerStub.java:202)  at org.apache.beam.runners.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:409)  at org.apache.beam.runners.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:311)  at org.apache.beam.runners.dataflow.worker.WindmillStateReader$BagPagingIterable$1.computeNext(WindmillStateReader.java:700)  at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)此错误在作业部署后大约 5 小时开始发生,并且随着时间的推移而增加。写作在 24 小时内显着减慢。我有 60 名工人,我怀疑每次出现错误时都会有一名工人失败,这最终会杀死工作。
查看完整描述

2 回答

?
慕田峪4524236

TA贡献1875条经验 获得超5个赞

错误“处理卡住...”表示某些特定操作花费的时间超过 5m,而不是作业永久卡住。但是,由于步骤 FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles 是卡住并且作业被取消/终止的步骤,所以我会在作业写入临时文件时考虑一个问题。

我发现了与用于写入临时文件的第二粒度时间戳 (yyyy-MM-dd_HH-mm-ss) 相关的BEAM-7689问题。发生这种情况是因为多个并发作业可以共享同一个临时目录,这可能导致其中一个作业在其他作业完成之前将其删除。

根据之前的链接,为缓解此问题,请升级到 SDK 2.14。并让我们知道错误是否消失。


查看完整回答
反对 回复 2022-10-12
?
饮歌长啸

TA贡献1951条经验 获得超3个赞

自从发布这个问题以来,我已经优化了数据流作业以避开瓶颈并增加并行化。就像 rsantiago 解释的那样,处理卡住不是错误,而只是数据流传达的一种方式,即一个步骤比其他步骤花费的时间要长得多,这本质上是一个瓶颈,无法用给定的资源清除。我所做的更改似乎已经解决了这些问题。新代码如下:


public void streamData() {


        try {

            Pipeline pipeline = Pipeline.create(options);


            pipeline.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))

            .apply(options.getWindowDuration() + " Window",

                    Window.<PubsubMessage>into(FixedWindows.of(parseDuration(options.getWindowDuration())))

                          .triggering(AfterWatermark.pastEndOfWindow()) 

                          .discardingFiredPanes()

                          .withAllowedLateness(parseDuration("24h")))

            .apply(FileIO.<String,PubsubMessage>writeDynamic()

                    .by(new datePartition(options.getOutputFilenamePrefix()))

                    .via(Contextful.fn(

                            (SerializableFunction<PubsubMessage, String>) inputMsg -> new String(inputMsg.getPayload(), StandardCharsets.UTF_8)),

                            TextIO.sink())

                    .withDestinationCoder(StringUtf8Coder.of())

                    .to(options.getOutputDirectory())

                    .withNaming(type -> new CrowdStrikeFileNaming(type))

                    .withNumShards(options.getNumShards())

                    .withTempDirectory(options.getTempLocation()));


            pipeline.run();

        }


        catch(Exception e) {


            LOG.error("Unable to deploy pipeline");

            LOG.error(e.toString(), e);

        }


    }

最大的变化涉及删除 extractMsg() 函数并将分区更改为仅使用元数据。这两个步骤都强制对消息进行反序列化/重新序列化并严重影响性能。


此外,由于我的数据集是无限的,我必须设置一个非零数量的分片。我想简化我的文件命名策略,所以我将它设置为 1 却不知道它对性能的影响有多大。从那时起,我为我的工作找到了工人/碎片/机器类型的良好平衡(不幸的是,主要基于猜测和检查)。


尽管在足够大的数据负载下仍然可能观察到瓶颈,但尽管负载很重(每天 3-5 tb),管道仍然表现良好。这些更改还显着改善了自动缩放,但我不知道为什么。数据流作业现在对峰值和谷值的反应要快得多。


查看完整回答
反对 回复 2022-10-12
  • 2 回答
  • 0 关注
  • 88 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信