我FlinkKafkaConsumer011订阅了一个主题。我希望apply在每个 kafka 消费者消息上处理 ( ),因此自定义在每个元素FooTrigger上返回TriggerResult.FIRE。以下代码有效,我只是对timeWindowAll(Time.minutes(1)). 看起来我做错了什么。// set up streaming execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);// create a Kafka consumerFlinkKafkaConsumer011<Foo> consumer = new FlinkKafkaConsumer011<>( "topic", new Foo.FooSchema(), props); // Properties object// create Kafka consumer data sourceDataStream<FooTuple> trades = env.addSource(consumer) .timeWindowAll(Time.minutes(1)) .trigger(new FooTrigger()) .evictor(new FooEvictor()) .apply(new CreateFoos());
1 回答
慕的地6264312
TA贡献1817条经验 获得超6个赞
如果您的目标是将函数应用于流中的每个事件,ProcessFunction
那么在 Flink 中使用a将是一种更自然的方法。或者在更简单的情况下,您可以使用地图或平面地图,或其丰富的变体,即 RichMapFunction 或 RichFlatMapFunction —— 这完全取决于您要尝试做什么。
使用 map 或 flatmap,您可以执行无状态的一对一或一对多转换,它们的丰富变体可以使用键控状态,而 ProcessFunction 可以使用状态和计时器(前提是流已被键控)。
timeWindowAll 适用于流未按键分区的情况,并且您希望按持续时间定义的批处理进行非并行处理(对于键控并行窗口,请改用 timeWindow)。如果您只想在数据到达时对其进行处理,那么窗口化会增加不必要的复杂性。
添加回答
举报
0/150
提交
取消