我有一个用例,我需要将记录从 hive 移动到 kafka。我找不到可以直接将 kafka sink 添加到 flink 数据集的方法。因此,我使用了一种解决方法,我在 flink 数据集上调用地图转换,并在地图函数内部对给定记录使用 kafkaProducer.send() 命令。我面临的问题是我没有任何方法可以在每个工作节点上执行 kafkaProducer.flush(),因此用 kafka 写入的记录数总是比数据集中的记录数略少。有没有一种优雅的方法来处理这个问题?有什么办法可以在 flink 中将 kafka sink 添加到数据集?或者调用 kafkaProducer.flush() 作为终结器的方法?
添加回答
举报
0/150
提交
取消