为了账号安全,请及时绑定邮箱和手机立即绑定

Java/Scala Kafka Producer 不向主题发送消息

Java/Scala Kafka Producer 不向主题发送消息

GCT1015 2021-09-12 14:29:13
我在将序列化的 XML 发送到我的 Kafka 主题时遇到问题。每当我运行我的代码时,我都没有收到任何异常或错误消息,但我仍然看不到 Kafka 主题中的任何消息。我的 Kafka-Producer 设置是:def WartungsdbKafkaConnector(args: Array[String]): Unit = {      val xmlFile = args(0)    val record = getRecord(xmlFile)    val kafkaProducer = getKafkaProducer    kafkaProducer.send(record)}protected def getRecord(xmlFile: String): ProducerRecord[String, String] = {    val lines = scala.io.Source.fromFile(xmlFile).mkString    val xml = scala.xml.XML.loadString(lines)    val paramPress = xml \ "PARAMETER" \ "PRESS"    val databaseId = allCatch.opt {paramPress.\@("NUMBER")}    val key = databaseId.get    val topic = args(1)    new ProducerRecord(topic, key, lines)}protected def getKafkaProducer: KafkaProducer[String, String] = {      val props = new Properties    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,     "ec-x.eu-west-1.compute.amazonaws.com:9092," +    "ec2-x.eu-west-1.compute.amazonaws.com:9092," +    "ec2-x.eu-west-1.compute.amazonaws.com:9092")    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)    props.put(ProducerConfig.LINGER_MS_CONFIG, "100")    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy")    props.put(ProducerConfig.RETRIES_CONFIG, "20")    props.put(ProducerConfig.ACKS_CONFIG, "all")    new KafkaProducer[String, String](props)}
查看完整描述

1 回答

?
紫衣仙女

TA贡献1839条经验 获得超15个赞

您不会刷新、等待或关闭生产者,因此应用程序只是停止而不发送数据。


生产者在可配置的时间内批量处理数据和消息,以减少实际到达代理的发送请求数量。


尝试


kafkaProducer.send(record)  // optionally call get() on this to capture the result and potential errors 

kafkaProducer.flush() 

kafkaProducer.close()

最重要的是,永远不要忘记关闭生产者(或消费者)


查看完整回答
反对 回复 2021-09-12
  • 1 回答
  • 0 关注
  • 176 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信