我尝试使用 Kafka Java SDK 来实现消费者,但是我看到的大多数消费者示例都使用while(true)循环并在循环调用consume方法中获取消息。while (true) { final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000); if (consumerRecords.count()==0) { noRecordsCount++; if (noRecordsCount > giveUp) break; else continue; } consumerRecords.forEach(record -> { System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", record.key(), record.value(), record.partition(), record.offset()); }); consumer.commitAsync(); }我想知道是否有任何优雅的方法可以在不使用while类似于 RabbitMQ 实现的循环的情况下处理此问题:Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer);
2 回答
![?](http://img1.sycdn.imooc.com/545868190001d52602200220-100-100.jpg)
Qyouu
TA贡献1786条经验 获得超11个赞
您可以尝试使用Spring-kafkawhich 带有@KafkaListener注释并制作收听主题的方法,有关更多信息,请点击此处
因为在 apache-kafka 中没有优雅的方法来使方法作为主题的侦听器,因为消费者需要在特定时间间隔内轮询记录,需要循环中的代码
@KafkaListener(topics = "topicName", group = "foo")
public void listen(String message) {
System.out.println("Received Messasge in group foo: " + message);
}
添加回答
举报
0/150
提交
取消