为了账号安全,请及时绑定邮箱和手机立即绑定

SlidingWindows Python Apache Beam 复制数据

SlidingWindows Python Apache Beam 复制数据

慕尼黑的夜晚无繁华 2021-08-17 18:22:40
问题每次系统从带有滑动窗口的 pubsub 收到消息时,它都会被复制代码 | 'Parse dictionary' >> beam.Map(lambda elem: (elem['Serial'], int(elem['Value'])))     | 'window' >> beam.WindowInto(window.SlidingWindows(30, 15),accumulation_mode=AccumulationMode.DISCARDING) | 'Count' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())输出如果我只从 pub/sub 发送一条消息并尝试在滑动窗口完成后使用代码打印我所拥有的:class print_row2(beam.DoFn):    def process(self, row=beam.DoFn.ElementParam, window=beam.DoFn.WindowParam,timestamp=beam.DoFn.TimestampParam):        print row, timestamp2str(float(window.start)), timestamp2str(float(window.end)),timestamp2str(float(timestamp))结果('77777', 120.0) 2018-11-16 08:21:15.000 2018-11-16 08:21:45.000 2018-11-16 08:21:45.000('77777', 120.0) 2018-11-16 08:21:30.000 2018-11-16 08:22:00.000 2018-11-16 08:22:00.000如果我在'window' >> beam.WindowInto(window.SlidingWindows(30, 15))只收到一次之前打印消息“图形模式”下的流程:  time: ----t+00---t+15---t+30----t+45----t+60------>             :      :      :       :       :  w1:        |=X===========|       :       :  w2:               |==============|       :  ...消息 X 在滑动窗口开始时只发送了一次,应该只接收一次,但正在接收两次我已经尝试了两个 AccumulationMode 值,也尝试了 trigger=AftyerWatermark 但我无法解决问题。可能有什么问题?额外的使用 FixedWindows 这是我的海豚的正确代码:| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))| 'Speed Average' >> beam.GroupByKey()| "Calculating average" >> beam.CombineValues(beam.combiners.MeanCombineFn())要么| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))| "Calculating average" >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
查看完整描述

2 回答

?
阿晨1998

TA贡献2037条经验 获得超6个赞

我有完全相同的问题,但是在java中。我有一个持续时间为 10 秒和步长为 3 秒的窗口。当从我订阅的 mqtt 主题发出事件时,它看起来像我运行的 ParDo 函数,并向所有三个“构造”窗口发出第一个也是唯一的事件。


X 是我以随机时间戳发送的事件:2020-09-15T21:17:57.292Z


  time: ----t+00---t+15---t+30----t+45----t+60------>

             :      :      :       :       :

  w1:        |X============|       :       :

  w2:               |X=============|       :

  w3:                      |X==============|

  ...

甚至为它们分配了相同的时间戳!!我一定真的做错了什么。


我将 Scala 2.12 和 BEAM 2.23 与 Direct Runner 一起使用。


[提示]:我在 processElement 函数中使用状态!每个键 + 窗口保持状态的位置。也许那里有错误?我将尝试在没有状态的情况下对其进行测试。


更新:删除了状态字段,并将单个事件分配给一个窗口。


查看完整回答
反对 回复 2021-08-17
  • 2 回答
  • 0 关注
  • 138 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信