我在将序列化的 XML 发送到我的 Kafka 主题时遇到问题。每当我运行我的代码时,我都没有收到任何异常或错误消息,但我仍然看不到 Kafka 主题中的任何消息。我的 Kafka-Producer 设置是:def WartungsdbKafkaConnector(args: Array[String]): Unit = { val xmlFile = args(0) val record = getRecord(xmlFile) val kafkaProducer = getKafkaProducer kafkaProducer.send(record)}protected def getRecord(xmlFile: String): ProducerRecord[String, String] = { val lines = scala.io.Source.fromFile(xmlFile).mkString val xml = scala.xml.XML.loadString(lines) val paramPress = xml \ "PARAMETER" \ "PRESS" val databaseId = allCatch.opt {paramPress.\@("NUMBER")} val key = databaseId.get val topic = args(1) new ProducerRecord(topic, key, lines)}protected def getKafkaProducer: KafkaProducer[String, String] = { val props = new Properties props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ec-x.eu-west-1.compute.amazonaws.com:9092," + "ec2-x.eu-west-1.compute.amazonaws.com:9092," + "ec2-x.eu-west-1.compute.amazonaws.com:9092") props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName) props.put(ProducerConfig.LINGER_MS_CONFIG, "100") props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy") props.put(ProducerConfig.RETRIES_CONFIG, "20") props.put(ProducerConfig.ACKS_CONFIG, "all") new KafkaProducer[String, String](props)}
1 回答
紫衣仙女
TA贡献1839条经验 获得超15个赞
您不会刷新、等待或关闭生产者,因此应用程序只是停止而不发送数据。
生产者在可配置的时间内批量处理数据和消息,以减少实际到达代理的发送请求数量。
尝试
kafkaProducer.send(record) // optionally call get() on this to capture the result and potential errors
kafkaProducer.flush()
kafkaProducer.close()
最重要的是,永远不要忘记关闭生产者(或消费者)
添加回答
举报
0/150
提交
取消