为了账号安全,请及时绑定邮箱和手机立即绑定

自定义 AllWindowFunction 类成员

自定义 AllWindowFunction 类成员

慕尼黑8549860 2021-09-26 14:37:16
我有一个自定义AllWindowFunction类,它有一个负责数据库插入的类成员。与数据库的连接是持久的,并在构建过程中打开。问题是,AllWindowFunction创建/打开连接的实例与应用事件调用的实例不同。解决此问题的方法是静态成员,但我想知道这是否是唯一的解决方法?示例代码:public class CustomWindowFunction implements AllWindowFunction<String, String, TimeWindow> {    private static Connection database;    CustomWindowFunction() {        database = new Connection();    }    @Override    public void apply(TimeWindow timeWindow, Iterable<String> trades, Collector<String> out) {        // process data        database.save(data);        out.collect(data.toString());    }}我找不到有关此机制的任何信息,我只知道构造函数中的对象 ID 与调用的对象 ID 不同apply。
查看完整描述

2 回答

?
函数式编程

TA贡献1807条经验 获得超9个赞

这是因为每个函数都必须序列化才能分布在集群的节点上。您可以尝试使用RichAllWindowFunction,即所谓的“Rich”版本,您可以在其中使用open()方法,该方法将在每个并行运算符开始时调用。在这种方法中,您可以创建连接


public class CustomWindowFunction implements RichAllWindowFunction<String, String, TimeWindow> {


    private Connection database;


    @Override

    public void open(Configuration parameters) {

        database = new Connection();

    }


    @Override

    public void apply(TimeWindow timeWindow, Iterable<String> trades, Collector<String> out) {

        // process data

        database.save(data);

        out.collect(data.toString());

    }

}


查看完整回答
反对 回复 2021-09-26
  • 2 回答
  • 0 关注
  • 276 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信