为了账号安全,请及时绑定邮箱和手机立即绑定

应用于每个输入

应用于每个输入

杨__羊羊 2021-10-13 10:59:01
我FlinkKafkaConsumer011订阅了一个主题。我希望apply在每个 kafka 消费者消息上处理 ( ),因此自定义在每个元素FooTrigger上返回TriggerResult.FIRE。以下代码有效,我只是对timeWindowAll(Time.minutes(1)). 看起来我做错了什么。// set up streaming execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);// create a Kafka consumerFlinkKafkaConsumer011<Foo> consumer =  new FlinkKafkaConsumer011<>(    "topic",    new Foo.FooSchema(),    props);   // Properties object// create Kafka consumer data sourceDataStream<FooTuple> trades = env.addSource(consumer)    .timeWindowAll(Time.minutes(1))    .trigger(new FooTrigger())    .evictor(new FooEvictor())    .apply(new CreateFoos());
查看完整描述

1 回答

?
当年话下

TA贡献1890条经验 获得超9个赞

如果您的目标是将函数应用于流中的每个事件,ProcessFunction那么在 Flink 中使用a将是一种更自然的方法。或者在更简单的情况下,您可以使用地图或平面地图,或其丰富的变体,即 RichMapFunction 或 RichFlatMapFunction —— 这完全取决于您要尝试做什么。

使用 map 或 flatmap,您可以执行无状态的一对一或一对多转换,它们的丰富变体可以使用键控状态,而 ProcessFunction 可以使用状态和计时器(前提是流已被键控)。

timeWindowAll 适用于流未按键分区的情况,并且您希望按持续时间定义的批处理进行非并行处理(对于键控并行窗口,请改用 timeWindow)。如果您只想在数据到达时对其进行处理,那么窗口化会增加不必要的复杂性。


查看完整回答
反对 回复 2021-10-13
  • 1 回答
  • 0 关注
  • 71 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信