2 回答
TA贡献1828条经验 获得超3个赞
您想要实现 org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
当您这样做时,您将实现两种方法:
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// called when it's time to save state
myState.clear();
// Update myState with current application state
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// called when things start up, possibly recovering from an error
descriptor = new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class));
myState = context.getKeyedStateStore().getMapState(descriptor);
if (context.isRestored()) {
// restore application state from myState
}
}
您可以在initializeState() 方法而不是open() 中初始化myState 变量。
TA贡献1900条经验 获得超5个赞
我不相信你实际上可以在initializeState()中初始化广播状态。修改广播状态的唯一方法是通过在 processBroadcastElement 方法中获得的读/写上下文。
但是你可以做的是在initializeState中使用context.isRestored()来确定KeyedBroadcastProcessFunction是否是第一次初始化,并设置一个瞬态局部变量来记录此信息。然后,第一次调用 processBroadcastElement 方法时,您可以使用此信息来决定在广播状态中存储什么。但您必须在广播流上发送一些内容才能启动此操作。
添加回答
举报