为了账号安全,请及时绑定邮箱和手机立即绑定

引起:java.io.NotSerializedException:org.apache.kafka

引起:java.io.NotSerializedException:org.apache.kafka

慕沐林林 2023-08-04 16:34:33
我尝试使用 kafka 生产者发送 java 字符串消息。字符串消息是从Java Spark JavaPairDStream中提取的。JavaPairDStream<String, String> processedJavaPairStream = inputStream.mapToPair                 (record-> new Tuple2<>(record.key(), record.value())).mapValues(message -> message.replace('>', '#'));String outTopics = "outputTopic";String broker = "localhost:9092";Properties properties = new Properties();properties.put("bootstrap.servers", broker);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<String, String>(properties);processedJavaPairStream.foreachRDD(rdd -> rdd.foreach(tuple2 -> {    ProducerRecord<String, String> message = new ProducerRecord<String, String>(outTopics, tuple2._1, tuple2._2);    System.out.println(message.key() + " : " + message.value()); //(1)    producer.send(message).get(); //(2)}));(1) 行正确打印消息字符串。但是当我使用 kafka 生产者发送这些消息(如(2)行)时,它会抛出如下异常,我无法理解这个异常。我确认 kafaka 生产者消息是<String,String>通过第 (1) 行输入的。但为什么第(2)行会抛出这个异常呢?我是否错过任何流程?
查看完整描述

1 回答

?
动漫人物

TA贡献1815条经验 获得超10个赞

您需要为每个 RDD 创建生产者。

RDD 分布在多个执行器上,Producer 对象无法序列化以在它们之间共享


或者,查看结构化流的文档,您可以简单地执行此操作以写入主题;无需自己创建和发送记录

stream.writeStream().format("kafka")...

请注意,如果目标只是将一个主题映射到另一个主题,那么Kafka Streams API

比 Spark 更简单且开销更少


查看完整回答
反对 回复 2023-08-04
  • 1 回答
  • 0 关注
  • 141 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信