我有一个自定义AllWindowFunction类,它有一个负责数据库插入的类成员。与数据库的连接是持久的,并在构建过程中打开。问题是,AllWindowFunction创建/打开连接的实例与应用事件调用的实例不同。解决此问题的方法是静态成员,但我想知道这是否是唯一的解决方法?示例代码:public class CustomWindowFunction implements AllWindowFunction<String, String, TimeWindow> { private static Connection database; CustomWindowFunction() { database = new Connection(); } @Override public void apply(TimeWindow timeWindow, Iterable<String> trades, Collector<String> out) { // process data database.save(data); out.collect(data.toString()); }}我找不到有关此机制的任何信息,我只知道构造函数中的对象 ID 与调用的对象 ID 不同apply。
2 回答
函数式编程
TA贡献1807条经验 获得超9个赞
这是因为每个函数都必须序列化才能分布在集群的节点上。您可以尝试使用RichAllWindowFunction,即所谓的“Rich”版本,您可以在其中使用open()方法,该方法将在每个并行运算符开始时调用。在这种方法中,您可以创建连接
public class CustomWindowFunction implements RichAllWindowFunction<String, String, TimeWindow> {
private Connection database;
@Override
public void open(Configuration parameters) {
database = new Connection();
}
@Override
public void apply(TimeWindow timeWindow, Iterable<String> trades, Collector<String> out) {
// process data
database.save(data);
out.collect(data.toString());
}
}
添加回答
举报
0/150
提交
取消