3 回答
TA贡献1789条经验 获得超10个赞
你可以尝试这样的事情:
public class ConsumerDemoWithThread {
private Logger logger = LoggerFactory.getLogger(ConsumerDemoWithThread.class.getName());
private String bootstrapServers = "127.0.0.1:9092";
private String groupId = "my-first-application";
private String topic = "first-topic";
KafkaConsumer consumer = createConsumer(bootstrapServers, groupId, topic);
private void pollForRecords() {
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> processRecords());
}
private KafkaConsumer createConsumer(String bootstrapServers, String groupId, String topic) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// create consumer
KafkaConsumer consumer = new KafkaConsumer<String, String>(properties);
// subscribe consumer to our topic(s)
consumer.subscribe(Arrays.asList(topic));
return consumer;
}
private void processRecords() {
try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
logger.info("Key: " + record.key() + ", Value: " + record.value());
logger.info("Partition: " + record.partition() + ", Offset:" + record.offset());
}
}
} catch (WakeupException e) {
logger.info("Received shutdown signal!");
} finally {
consumer.close();
}
}
public static void main(String[] args) {
ConsumerDemoWithThread consumerDemoWithThread = new ConsumerDemoWithThread();
consumerDemoWithThread.pollForRecords();
}
}
TA贡献2021条经验 获得超8个赞
您可以使用@KafkaListener,
然而,它也会以无限循环进行轮询,因为这就是 Kafka 的设计方式——它不是一个队列,而是一个存储一段时间记录的事件总线。没有通知其消费者的机制。
轮询不同的线程并以优雅的方式退出循环。
TA贡献1880条经验 获得超4个赞
如果您希望能够在代码中同时执行多项操作,则需要后台线程。
为了更轻松地做到这一点,您可以使用更高级别的 Kafka 库,例如 Spring(已回答)、Vert.x或Smallrye
这是一个 Vert.x 示例,首先创建一个KafkaConsumer
,然后分配处理程序并订阅您的主题
consumer.handler(record -> {
System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
",partition=" + record.partition() + ",offset=" + record.offset());
});
// subscribe to a single topic
consumer.subscribe("a-single-topic");
添加回答
举报