最近我写了一个演示程序来启动反应式编程,结合 Reactor 和 RabbitMQ。这是我的演示代码:public class FluxWithRabbitMQDemo {private static final String QUEUE = "demo_thong";private final reactor.rabbitmq.Sender sender;private final Receiver receiver;public FluxWithRabbitMQDemo() { this.sender = ReactorRabbitMq.createSender(); this.receiver = ReactorRabbitMq.createReceiver();}public void run(int count) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.useNio(); SenderOptions senderOptions = new SenderOptions() .connectionFactory(connectionFactory) .resourceCreationScheduler(Schedulers.elastic()); reactor.rabbitmq.Sender sender = ReactorRabbitMq.createSender(senderOptions); Mono<AMQP.Queue.DeclareOk> queueDeclaration = sender.declareQueue(QueueSpecification.queue(QUEUE)); Flux<Delivery> messages = receiver.consumeAutoAck(QUEUE); queueDeclaration.thenMany(messages).subscribe(m->System.out.println("Get message "+ new String(m.getBody()))); Flux<OutboundMessageResult> dataStream = sender.sendWithPublishConfirms(Flux.range(1, count) .filter(m -> !m.equals(10)) .parallel() .runOn(Schedulers.parallel()) .doOnNext(i->System.out.println("Message " + i + " run on thread "+Thread.currentThread().getId())) .map(i -> new OutboundMessage("", QUEUE, ("Message " + i).getBytes()))); sender.declareQueue(QueueSpecification.queue(QUEUE)) .thenMany(dataStream) .doOnError(e -> System.out.println("Send failed"+ e)) .subscribe(m->{ if (m!= null){ System.out.println("Sent successfully message "+new String(m.getOutboundMessage().getBody())); } }); try { Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); }}我希望在 Flux 发出一个项目后,Sender 必须将它发送到 RabbitMQ,并且在接收到 RabbitMQ 之后,Receiver 必须接收它。但一切都是按顺序发生的,这就是我得到的结果
添加回答
举报
0/150
提交
取消