为了账号安全,请及时绑定邮箱和手机立即绑定

将 BlockingQueue 传递给 Spring KafkaListener

将 BlockingQueue 传递给 Spring KafkaListener

拉莫斯之舞 2021-09-26 17:33:52
我是 Java、Spring 和 Kafka 的新手。这是情况:我使用 @KafkaListener 注释制作了一个看起来像这样的 Kafka Consumer:public class Listener {private ExecutorService executorService;private List<Future> futuresThread1 = new ArrayList<>();public Listener() {    Properties appProps = new AppProperties().get();    this.executorService = Executors.newFixedThreadPool(Integer.parseInt(appProps.getProperty("listenerThreads")));}//TODO: how can I pass an approp into this annotation?@KafkaListener(id = "id0", topics = "bose.cdp.ingest.marge.boseaccount.normalized")public void listener(ConsumerRecord<?, ?> record, ArrayBlockingQueue<ConsumerRecord> arrayBlockingQueue) throws InterruptedException, ExecutionException    {        futuresThread1.add(executorService.submit(new Runnable() {                @Override public void run() {                    System.out.println(record);                    arrayBlockingQueue.add(record);                }        }));    }}我向侦听器添加了一个参数 ArrayBlockingQueue,我希望它可以将来自 Kafka 的消息添加到其中。我遇到的问题是我无法弄清楚我实际上是如何将 ArrayBlockingQueue 传递给侦听器的,因为 Spring 在幕后处理侦听器的实例化和运行。我需要这个阻塞队列,以便侦听器之外的另一个对象可以访问消息并使用它做一些工作。例如,在我的主要内容中:@SpringBootApplicationpublic class SourceAccountListenerApp {    public static void main(String[] args) {        Properties appProps = new AppProperties().get();        ArrayBlockingQueue<ConsumerRecord> arrayBlockingQueue = new ArrayBlockingQueue<>(           Integer.parseInt(appProps.getProperty("blockingQueueSize"))        );        //TODO: This starts my listener. How do I pass the queue to it?        SpringApplication.run(SourceAccountListenerApp.class, args);    }}
查看完整描述

1 回答

  • 1 回答
  • 0 关注
  • 138 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信