问题每次系统从带有滑动窗口的 pubsub 收到消息时,它都会被复制代码 | 'Parse dictionary' >> beam.Map(lambda elem: (elem['Serial'], int(elem['Value']))) | 'window' >> beam.WindowInto(window.SlidingWindows(30, 15),accumulation_mode=AccumulationMode.DISCARDING) | 'Count' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())输出如果我只从 pub/sub 发送一条消息并尝试在滑动窗口完成后使用代码打印我所拥有的:class print_row2(beam.DoFn): def process(self, row=beam.DoFn.ElementParam, window=beam.DoFn.WindowParam,timestamp=beam.DoFn.TimestampParam): print row, timestamp2str(float(window.start)), timestamp2str(float(window.end)),timestamp2str(float(timestamp))结果('77777', 120.0) 2018-11-16 08:21:15.000 2018-11-16 08:21:45.000 2018-11-16 08:21:45.000('77777', 120.0) 2018-11-16 08:21:30.000 2018-11-16 08:22:00.000 2018-11-16 08:22:00.000如果我在'window' >> beam.WindowInto(window.SlidingWindows(30, 15))只收到一次之前打印消息“图形模式”下的流程: time: ----t+00---t+15---t+30----t+45----t+60------> : : : : : w1: |=X===========| : : w2: |==============| : ...消息 X 在滑动窗口开始时只发送了一次,应该只接收一次,但正在接收两次我已经尝试了两个 AccumulationMode 值,也尝试了 trigger=AftyerWatermark 但我无法解决问题。可能有什么问题?额外的使用 FixedWindows 这是我的海豚的正确代码:| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))| 'Speed Average' >> beam.GroupByKey()| "Calculating average" >> beam.CombineValues(beam.combiners.MeanCombineFn())要么| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))| "Calculating average" >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
2 回答
阿晨1998
TA贡献2037条经验 获得超6个赞
我有完全相同的问题,但是在java中。我有一个持续时间为 10 秒和步长为 3 秒的窗口。当从我订阅的 mqtt 主题发出事件时,它看起来像我运行的 ParDo 函数,并向所有三个“构造”窗口发出第一个也是唯一的事件。
X 是我以随机时间戳发送的事件:2020-09-15T21:17:57.292Z
time: ----t+00---t+15---t+30----t+45----t+60------>
: : : : :
w1: |X============| : :
w2: |X=============| :
w3: |X==============|
...
甚至为它们分配了相同的时间戳!!我一定真的做错了什么。
我将 Scala 2.12 和 BEAM 2.23 与 Direct Runner 一起使用。
[提示]:我在 processElement 函数中使用状态!每个键 + 窗口保持状态的位置。也许那里有错误?我将尝试在没有状态的情况下对其进行测试。
更新:删除了状态字段,并将单个事件分配给一个窗口。
添加回答
举报
0/150
提交
取消