考虑以下代码:@Test(singleThreaded = true)public class KafkaConsumerTest{ private KafkaTemplate<String, byte[]> template; private DefaultKafkaConsumerFactory<String, byte[]> consumerFactory; private static final KafkaEmbedded EMBEDDED_KAFKA; static { EMBEDDED_KAFKA = new KafkaEmbedded(1, true, "topic"); try { EMBEDDED_KAFKA.before(); } catch (final Exception e) { e.printStackTrace(); } } @BeforeMethod public void setUp() throws Exception { final Map<String, Object> senderProps = KafkaTestUtils.senderProps(EMBEDDED_KAFKA.getBrokersAsString()); senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); final ProducerFactory<String, byte[]> pf = new DefaultKafkaProducerFactory<>(senderProps); this.template = new KafkaTemplate<>(pf); this.template.setDefaultTopic("topic"); final Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sender", "false", EMBEDDED_KAFKA); this.consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps); this.consumerFactory.setValueDeserializer(new ByteArrayDeserializer()); this.consumerFactory.setKeyDeserializer(new StringDeserializer()); }我正在尝试向 a 发送消息KafkaTemplate并使用Consumer.poll(). 我使用的测试框架是TestNG。发送作品,我已经验证使用我在网上找到的“常用”代码(在 上注册一个消息侦听器KafkaMessageListenerContainer)。只是,我从来没有在消费者那里收到过任何东西。我已经针对“真实”的 Kafka 安装尝试了相同的序列 (create Consumer, poll()),并且它有效。因此,我设置ConsumerFactory? 任何帮助将不胜感激!
添加回答
举报
0/150
提交
取消