为了账号安全,请及时绑定邮箱和手机立即绑定

如何在 Flink 中增加 SinkFunction 的 numRecordsOutPut 指标?

如何在 Flink 中增加 SinkFunction 的 numRecordsOutPut 指标?

一只斗牛犬 2022-07-14 17:25:36
我正在使用 flink 来消耗 kafka 并写入 redis。这是我对redis的接收器功能:            .addSink(new RichSinkFunction<MobilePageEvent>() {                @Override                public void invoke(MobilePageEvent event, Context context) {                    JEDIS_CLUSTER.zadd(..);                }            })            .name("redis sink");虽然我可以从 redis 命令行获取数据,但指标显示 sink 函数的输出为零:我怎样才能增加这个指标?
查看完整描述

1 回答

?
qq_笑_17

TA贡献1818条经验 获得超7个赞

numRecordsIn 和 numRecordsOut 指标仅计算在 Flink 作业本身内流动的流记录,不包括与外部系统的通信。所以换句话说,源不报告任何记录进来,汇不报告任何记录出去。

在我看来,您有几个选择:

  1. 使用接收器上的 numRecordsIn 指标作为您想知道的近似值

  2. fork 或扩展 RedisSink 并添加您想要的指标

此处显示了添加计数器度量的模式。

在 redis sink 的情况下,您可以在 open() 方法中初始化一个 Counter,并在 invoke() 中递增它。但这似乎毫无意义,因为这只会反映 numRecordsIn 指标。如果您的 redis 接收器正在执行缓冲批量写入,那么等待增加指标直到数据实际发送到 redis 可能更有意义——在这种情况下,您可能更愿意使用 Meter 而不是 Counter。


查看完整回答
反对 回复 2022-07-14
  • 1 回答
  • 0 关注
  • 194 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信