为了账号安全,请及时绑定邮箱和手机立即绑定

如何仅在第一个元素上在 CustomTrigger 中启动处理时间计时器?

如何仅在第一个元素上在 CustomTrigger 中启动处理时间计时器?

慕仙森 2022-05-25 15:40:25
我正在Trigger为我的应用程序使用带有自定义功能的 GlobalWindow。根据要求,在 Trigger 函数中,我只需要在窗口中的第一个元素上启动一个处理时间计时器。我尝试使用变量来实现它firstEventflag。像这样。.window(GlobalWindows.create()).trigger(new Trigger<ImpactEventObject, GlobalWindow>() {    Boolean firstEventflag = false;    @Override    public TriggerResult onElement(ImpactEventObject impactEventObject, long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {        if (!firstEventflag) {            firstEventflag = true;            triggerContext.registerProcessingTimeTimer(                triggerContext.getCurrentProcessingTime() + 20000);        }        return TriggerResult.CONTINUE;    }    @Override    public TriggerResult onProcessingTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {    return TriggerResult.FIRE;}但这失败了,因为今天我发现firstEventflag每次创建新窗口时都没有初始化变量,它取决于正在处理窗口的子任务,这意味着不同的窗口可以共享同一个变量,firstEventflag从而使这个逻辑实际上毫无用处。鉴于此,我该如何解决我的问题?
查看完整描述

1 回答

?
函数式编程

TA贡献1807条经验 获得超9个赞

通过查看CountTrigger 这里的源代码想出了一种方法。


GlobalWindow我们可以用 a来保持元素的数量ReducingStateDescriptor。并在此计数为 1 时启动计时器,这意味着 - 仅在第一个元素上启动计时器。


public class CustomTrigger extends Trigger<GenericObject, GlobalWindow> {


private final ReducingStateDescriptor<Long> stateDesc = new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);


@Override

public TriggerResult onElement(ImpactEventObject impactEventObject, long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {

    ReducingState<Long> count = triggerContext.getPartitionedState(stateDesc);

    count.add(1L);


    if (count.get() == 1) {

        triggerContext.registerProcessingTimeTimer(

            triggerContext.getCurrentProcessingTime() + 20000);

    }

    return TriggerResult.CONTINUE;

}


@Override

public TriggerResult onProcessingTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {

    return TriggerResult.FIRE;

}


@Override

public TriggerResult onEventTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {

    return null;

}


@Override

public void clear(GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {

    triggerContext.deleteProcessingTimeTimer(triggerContext.getCurrentProcessingTime());

}


private static class Sum implements ReduceFunction<Long> {

    private static final long serialVersionUID = 1L;

    @Override

    public Long reduce(Long value1, Long value2) throws Exception {

        return value1 + value2;

    }


}

}


查看完整回答
反对 回复 2022-05-25
  • 1 回答
  • 0 关注
  • 127 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信