为了账号安全,请及时绑定邮箱和手机立即绑定

Flink 维护配置状态

Flink 维护配置状态

皈依舞 2023-10-12 17:01:10
我有一个在 Flink 中维护配置的用例,但我真的不知道如何处理。假设我在某处存储了一些配置,并且我需要它来进行处理。在Flink作业初始化时,我想加载所有配置。这个配置也可以在Flink作业运行期间修改,所以我必须在内存中保存这个配置的状态,并在需要时更新它。配置的更新可以从 KafkaSource 访问。这就是我所拥有的:我有一个函数可以加载整个配置,将其保持在某种状态并将其与我的数据流关联:public class MyConfiguration extends RichFlatMapFunction<Row, Row>{    private transient MapState<String, MyConfObject> configuration;    @Override    public void open(MyConfiguration config) throws Exception{        MapStateDescriptor<String,MyConfObject> descriptor = new MapStateDescriptor<String,MyConfObject>(                "configuration",                BasicTypeInfo.STRING_TYPE_INFO,                ...        );        configuration = getRuntimeContext().getMapState(descriptor);        configuration.putAll(...);   // Load configuration from somewhere    }    @Override    public void flatMap(Row value, Collector<Row> out) throws Exception {        MyConfObject conf = configuration.get(...);        ...               // Associate conf with data        out.collect(value);    }}我的管道看起来像这样:DataStream<Row> dataStream = ...; // My data streamDataStream<Map<String, MyConfObject> streamConf =      env.addSource(new FlinkKafkaConsumer<Row>(..., ..., ...)) // The stream of configuration updates        .map(...); return dataStream    .assignTimestampsAndWatermarks(...)    .flatMap(new MyConfiguration())    ... //Do some processing    .map(m -> {        ObjectMapper objectMapper = new ObjectMapper();        String json = objectMapper.writeValueAsString(m);        return json.getBytes();    });我想要的是使用配置更新流streamConf来更新平面地图函数内的 State 变量MyConfiguration。我怎样才能做到这一点 ?
查看完整描述

1 回答

?
江户川乱折腾

TA贡献1851条经验 获得超5个赞

我建议您编写一个源代码,从 Kafka 读取配置信息,然后通过广播流将配置更改广播到映射函数。映射函数将以持久状态存储完整的当前配置,而广播流意味着映射函数的所有实例都将获得所有配置更改。



查看完整回答
反对 回复 2023-10-12
  • 1 回答
  • 0 关注
  • 63 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信