为了账号安全,请及时绑定邮箱和手机立即绑定

使用 Reactor 和 RabbitMQ 进行反应式编程

使用 Reactor 和 RabbitMQ 进行反应式编程

摇曳的蔷薇 2021-08-04 15:08:31
最近我写了一个演示程序来启动反应式编程,结合 Reactor 和 RabbitMQ。这是我的演示代码:public class FluxWithRabbitMQDemo {private static final String QUEUE = "demo_thong";private final reactor.rabbitmq.Sender sender;private final Receiver receiver;public FluxWithRabbitMQDemo() {    this.sender = ReactorRabbitMq.createSender();    this.receiver = ReactorRabbitMq.createReceiver();}public void run(int count) {    ConnectionFactory connectionFactory = new ConnectionFactory();    connectionFactory.useNio();    SenderOptions senderOptions =  new SenderOptions()            .connectionFactory(connectionFactory)            .resourceCreationScheduler(Schedulers.elastic());    reactor.rabbitmq.Sender sender = ReactorRabbitMq.createSender(senderOptions);    Mono<AMQP.Queue.DeclareOk> queueDeclaration = sender.declareQueue(QueueSpecification.queue(QUEUE));    Flux<Delivery> messages = receiver.consumeAutoAck(QUEUE);    queueDeclaration.thenMany(messages).subscribe(m->System.out.println("Get message "+ new String(m.getBody())));    Flux<OutboundMessageResult> dataStream = sender.sendWithPublishConfirms(Flux.range(1, count)            .filter(m -> !m.equals(10))            .parallel()            .runOn(Schedulers.parallel())            .doOnNext(i->System.out.println("Message  " + i + " run on thread "+Thread.currentThread().getId()))            .map(i -> new OutboundMessage("", QUEUE, ("Message " + i).getBytes())));    sender.declareQueue(QueueSpecification.queue(QUEUE))            .thenMany(dataStream)            .doOnError(e -> System.out.println("Send failed"+ e))            .subscribe(m->{                if (m!= null){                    System.out.println("Sent successfully message "+new String(m.getOutboundMessage().getBody()));                }            });    try {        Thread.sleep(20000);    } catch (InterruptedException e) {        e.printStackTrace();    }}我希望在 Flux 发出一个项目后,Sender 必须将它发送到 RabbitMQ,并且在接收到 RabbitMQ 之后,Receiver 必须接收它。但一切都是按顺序发生的,这就是我得到的结果
查看完整描述

1 回答

?
慕盖茨4494581

TA贡献1850条经验 获得超11个赞

消息生成太快。要查看交错,请dataStream添加

.doOnNext(i->Thread.sleep(10))


查看完整回答
反对 回复 2021-08-04
  • 1 回答
  • 0 关注
  • 444 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信