为了账号安全,请及时绑定邮箱和手机立即绑定

使用 poll() 的消费者单元测试永远不会收到任何东西

使用 poll() 的消费者单元测试永远不会收到任何东西

三国纷争 2021-10-20 14:40:08
考虑以下代码:@Test(singleThreaded = true)public class KafkaConsumerTest{  private KafkaTemplate<String, byte[]> template;  private DefaultKafkaConsumerFactory<String, byte[]> consumerFactory;  private static final KafkaEmbedded EMBEDDED_KAFKA;  static {      EMBEDDED_KAFKA = new KafkaEmbedded(1, true, "topic");      try { EMBEDDED_KAFKA.before(); } catch (final Exception e) { e.printStackTrace(); }    }  @BeforeMethod  public void setUp() throws Exception {    final Map<String, Object> senderProps = KafkaTestUtils.senderProps(EMBEDDED_KAFKA.getBrokersAsString());    senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);    senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);    final ProducerFactory<String, byte[]> pf = new DefaultKafkaProducerFactory<>(senderProps);    this.template = new KafkaTemplate<>(pf);    this.template.setDefaultTopic("topic");    final Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sender", "false", EMBEDDED_KAFKA);    this.consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);    this.consumerFactory.setValueDeserializer(new ByteArrayDeserializer());    this.consumerFactory.setKeyDeserializer(new StringDeserializer());  }我正在尝试向 a 发送消息KafkaTemplate并使用Consumer.poll(). 我使用的测试框架是TestNG。发送作品,我已经验证使用我在网上找到的“常用”代码(在 上注册一个消息侦听器KafkaMessageListenerContainer)。只是,我从来没有在消费者那里收到过任何东西。我已经针对“真实”的 Kafka 安装尝试了相同的序列 (create Consumer, poll()),并且它有效。因此,我设置ConsumerFactory? 任何帮助将不胜感激!
查看完整描述

1 回答

  • 1 回答
  • 0 关注
  • 171 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信