为了账号安全,请及时绑定邮箱和手机立即绑定

如何在 Flink 程序中打印 kafka 主题数据?

如何在 Flink 程序中打印 kafka 主题数据?

函数式编程 2021-12-30 19:43:20
我通过这个指令创建了一个主题:C:\kafka_2.12-0.10.2.1>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test < C:\User11\Desktop\Data.csv然后我测试了该主题是否正确使用了该数据。之后想在Flink程序中打印topic,我的程序是这样的: try{    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    Properties properties = new Properties();    DataStream<String> stream = env            .addSource(new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(),properties));           stream.print();    env.execute();    } catch (Exception e) {        e.printStackTrace();    }但是我得到了这个信息(因为信息太长我不得不写一些):[main] INFO org.apache.flink.streaming.api.environment.LocalStreamEnvironment - 在本地嵌入式 Flink mini 集群上运行作业 [main] INFO org.apache.flink.runtime.minicluster.MiniCluster - 启动 Flink Mini Cluster [main] INFO org.apache.flink.runtime.minicluster.MiniCluster - 启动指标注册表 [main] INFO org.apache.flink.runtime.metrics.MetricRegistryImpl - 没有配置指标报告器,不会公开/报告指标。[main] INFO org.apache.flink.runtime.minicluster.MiniCluster - 启动 RPC 服务 [flink-akka.actor.default-dispatcher-2] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger 启动 [main] INFO org.apache.flink.runtime.minicluster.MiniCluster - 启动高可用性服务 [main] INFO org.apache.flink.runtime.blob.BlobServer - 创建 BLOB 服务器存储目录 C:另外,我也看到了这个链接,但它没有解决我的问题: How to access/read kafka topic data from flink?你能告诉我这里有什么问题吗?
查看完整描述

1 回答

?
白衣非少年

TA贡献1155条经验 获得超0个赞

问题解决了。首先,我用这个命令填满了 Kafka 主题:


/home/kafka_2.11-2.0.0/bin/kafka-console-producer.sh --broker-list 10.32.0.2:9092,10.32.0.3:9092,10.32.0.4:9092 --topic flinkTopic < transactions2.csv

然后,使用此代码,我可以打印 Kafka 主题:


 final StreamExecutionEnvironment env = 

 StreamExecutionEnvironment.getExecutionEnvironment();

 Properties prop = new Properties();

 prop.setProperty("bootstrap.servers", 

 "10.32.0.2:9092,10.32.0.3:9092,10.32.0.4:9092");

 prop.setProperty("group.id", "test");

    FlinkKafkaConsumer<String> myConsumer= new FlinkKafkaConsumer<> 

  ("flinkTopic", new SimpleStringSchema(),prop);

    myConsumer.setStartFromEarliest();

    DataStream<String> stream = env.addSource(myConsumer);

    stream.print();

    env.execute("Flink Streaming Java API Skeleton");

我希望它对其他人有用。


查看完整回答
反对 回复 2021-12-30
  • 1 回答
  • 0 关注
  • 237 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信