我尝试使用 kafka 生产者发送 java 字符串消息。字符串消息是从Java Spark JavaPairDStream中提取的。JavaPairDStream<String, String> processedJavaPairStream = inputStream.mapToPair (record-> new Tuple2<>(record.key(), record.value())).mapValues(message -> message.replace('>', '#'));String outTopics = "outputTopic";String broker = "localhost:9092";Properties properties = new Properties();properties.put("bootstrap.servers", broker);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<String, String>(properties);processedJavaPairStream.foreachRDD(rdd -> rdd.foreach(tuple2 -> { ProducerRecord<String, String> message = new ProducerRecord<String, String>(outTopics, tuple2._1, tuple2._2); System.out.println(message.key() + " : " + message.value()); //(1) producer.send(message).get(); //(2)}));(1) 行正确打印消息字符串。但是当我使用 kafka 生产者发送这些消息(如(2)行)时,它会抛出如下异常,我无法理解这个异常。我确认 kafaka 生产者消息是<String,String>通过第 (1) 行输入的。但为什么第(2)行会抛出这个异常呢?我是否错过任何流程?
1 回答
动漫人物
TA贡献1815条经验 获得超10个赞
您需要为每个 RDD 创建生产者。
RDD 分布在多个执行器上,Producer 对象无法序列化以在它们之间共享
或者,查看结构化流的文档,您可以简单地执行此操作以写入主题;无需自己创建和发送记录
stream.writeStream().format("kafka")...
请注意,如果目标只是将一个主题映射到另一个主题,那么Kafka Streams API
比 Spark 更简单且开销更少
添加回答
举报
0/150
提交
取消