为了账号安全,请及时绑定邮箱和手机立即绑定

Spring集成-发布者确认超时?

Spring集成-发布者确认超时?

莫回无 2022-06-23 15:46:22
这是我目前的设置:queue1 和 queue2 与流向 channel1 的集成流一起标记:@Beanpublic IntegrationFlow q1f() {    return IntegrationFlows            .from(queue1InboundAdapter())            ...            .channel(amqpInputChannel())            .get();}@Beanpublic IntegrationFlow q2f() {    return IntegrationFlows            .from(queue2InboundAdapter())            ...            .channel(amqpInputChannel())            .get();}然后,一切都被聚合,然后在聚合消息被rabbitmq确认后确认:@Bean    public IntegrationFlow aggregatingFlow() {        return IntegrationFlows                .from(amqpInputChannel())                .aggregate(...                        .expireGroupsUponCompletion(true)                        .sendPartialResultOnExpiry(true)                        .groupTimeout(TimeUnit.SECONDS.toMillis(10))                        .releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(200, TimeUnit.SECONDS.toMillis(10)))                )                .handle(amqpOutboundEndpoint())                .get();    }    @Bean    public AmqpOutboundEndpoint amqpOutboundEndpoint() {        AmqpOutboundEndpoint outboundEndpoint = new AmqpOutboundEndpoint(ackTemplate());        outboundEndpoint.setConfirmAckChannel(manualAckChannel());        outboundEndpoint.setConfirmCorrelationExpressionString("#root");        outboundEndpoint.setExchangeName(RABBIT_PREFIX + "ix.archiveupdate");        outboundEndpoint.setRoutingKeyExpression(routingKeyExpression()); //forward using patition id as routing key        return outboundEndpoint;    }ackTemplate()用 cf 设置springFactory.setPublisherConfirms(true);。我看到的问题是,每 10 天一次,有一些消息卡unacknowledged在 rabbitmq 中的状态。我的猜测是,消息的发布以某种方式等待兔子做,PUBLISHER CONFIRMS但它永远不会得到它并超时?在这种情况下,我从不 ACK 中的消息queue1。这可能吗?因此,只需再完成一次工作流程:[两个队列->直接通道->聚合器(保留通道和标记值)->发布到兔子->兔子通过发布者确认返回ACK->spring确认通道上的所有消息+它为聚合消息保存在内存中的值]
查看完整描述

2 回答

?
FFIVE

TA贡献1797条经验 获得超6个赞

在 Spring AMQP 2.1 版(Spring Integration 5.1)中,我们添加了一个Future<?>并返回消息CorrelationData来协助处理这种事情。如果您使用的是旧版本,则可以子类CorrelationData化(并且您必须在代码中处理设置未来和返回的消息)。


这与计划任务一起,可以检测丢失的确认...


@SpringBootApplication

@EnableScheduling

public class Igh2755Application {


    public static void main(String[] args) {

        SpringApplication.run(Igh2755Application.class, args);

    }


    private final BlockingQueue<CorrelationData> futures = new LinkedBlockingQueue<>();


    @Bean

    public ApplicationRunner runner(RabbitTemplate template) {

        return args -> {

            SuccessCallback<? super Confirm> successCallback = confirm -> {

                System.out.println((confirm.isAck() ? "A" : "Na") + "ck received");

            };

            FailureCallback failureCallback = throwable -> {

                System.out.println(throwable.getMessage());

            };


            // Good - ack

            CorrelationData correlationData = new CorrelationData("good");

            correlationData.getFuture().addCallback(successCallback, failureCallback);

            this.futures.put(correlationData);

            template.convertAndSend("", "foo", "data", correlationData);


            // Missing exchange nack, no return

            correlationData = new CorrelationData("missing exchange");

            correlationData.getFuture().addCallback(successCallback, failureCallback);

            this.futures.put(correlationData);

            template.convertAndSend("missing exchange", "foo", "data", correlationData);


            // Missing queue ack, with return

            correlationData = new CorrelationData("missing queue");

            correlationData.getFuture().addCallback(successCallback, failureCallback);

            this.futures.put(correlationData);

            template.convertAndSend("", "missing queue", "data", correlationData);

        };

    }


    @Scheduled(fixedDelay = 5_000)

    public void checkForMissingAcks() {

        System.out.println("Checking pending acks");

        CorrelationData correlationData = this.futures.poll();

        while (correlationData != null) {

            try {

                if (correlationData.getFuture().get(10, TimeUnit.SECONDS).isAck()) {

                    if (correlationData.getReturnedMessage() == null) {

                        System.out.println("Ack received OK for " + correlationData.getId());

                    }

                    else {

                        System.out.println("Message returned for " + correlationData.getId());

                    }

                }

                else {

                    System.out.println("Nack received for " + correlationData.getId());

                }

            }

            catch (InterruptedException e) {

                Thread.currentThread().interrupt();

                System.out.println("Interrupted");

            }

            catch (ExecutionException e) {

                System.out.println("Failed to get an ack " + e.getCause().getMessage());

            }

            catch (TimeoutException e) {

                System.out.println("Timed out waiting for ack for " + correlationData.getId());

            }

            correlationData = this.futures.poll();

        }

        System.out.println("No pending acks, exiting");

    }


}

.


Checking pending acks

Ack received OK for good

Nack received for missing exchange

Message returned for missing queue

No pending acks, exiting

使用 Spring IntegrationconfirmCorrelationExpression可以使用它来创建CorrelationData实例。


编辑


使用 Spring 集成...


@SpringBootApplication

@EnableScheduling

public class Igh2755Application {


    public static void main(String[] args) {

        SpringApplication.run(Igh2755Application.class, args);

    }


    private final BlockingQueue<CorrelationData> futures = new LinkedBlockingQueue<>();


    public interface Gate {


        void send(@Header("exchange") String exchange, @Header("rk") String rk, String payload);


    }


    @Bean

    @DependsOn("flow")

    public ApplicationRunner runner(Gate gate) {

        return args -> {

            gate.send("", "foo", "good");

            gate.send("junque", "rk", "missing exchange");

            gate.send("", "junque", "missing queue");

        };

    }


    @Bean

    public IntegrationFlow flow(RabbitTemplate template) {

        return IntegrationFlows.from(Gate.class)

                    .handle(Amqp.outboundAdapter(template)

                            .confirmCorrelationExpression("@correlationCreator.create(#root)")

                            .exchangeNameExpression("headers.exchange")

                            .routingKeyExpression("headers.rk")

                            .returnChannel(returns())

                            .confirmAckChannel(acks())

                            .confirmNackChannel(acks()))

                    .get();

    }


    @Bean

    public MessageChannel acks() {

        return new DirectChannel();

    }


    @Bean

    public MessageChannel returns() {

        return new DirectChannel();

    }


    @Bean

    public IntegrationFlow ackFlow() {

        return IntegrationFlows.from("acks")

                /*

                 * Work around a bug because the correlation data is wrapped and so the

                 * wrong future is completed.

                 */

                .handle(m -> {

                    System.out.println(m);

                    if (m instanceof ErrorMessage) { // NACK

                        NackedAmqpMessageException nme = (NackedAmqpMessageException) m.getPayload();

                        CorrelationData correlationData = (CorrelationData) nme.getCorrelationData();

                        correlationData.getFuture().set(new Confirm(false, "Message was returned"));

                    }

                    else {

                        ((CorrelationData) m.getPayload()).getFuture().set(new Confirm(true, null));

                    }

                })

                .get();

    }


    @Bean

    public IntegrationFlow retFlow() {

        return IntegrationFlows.from("returns")

                .handle(System.out::println)

                .get();

    }


    @Bean

    public CorrelationCreator correlationCreator() {

        return new CorrelationCreator(this.futures);

    }


    public static class CorrelationCreator {


        private final BlockingQueue<CorrelationData> futures;


        public CorrelationCreator(BlockingQueue<CorrelationData> futures) {

            this.futures = futures;

        }


        public CorrelationData create(Message<String> message) {

            CorrelationData data = new CorrelationData(message.getPayload());

            this.futures.add(data);

            return data;

        }


    }


    @Scheduled(fixedDelay = 5_000)

    public void checkForMissingAcks() {

        System.out.println("Checking pending acks");

        CorrelationData correlationData = this.futures.poll();

        while (correlationData != null) {

            try {

                if (correlationData.getFuture().get(10, TimeUnit.SECONDS).isAck()) {

                    if (correlationData.getReturnedMessage() == null

                            && !correlationData.getId().equals("Message was returned")) {

                        System.out.println("Ack received OK for " + correlationData.getId());

                    }

                    else {

                        System.out.println("Message returned for " + correlationData.getId());

                    }

                }

                else {

                    System.out.println("Nack received for " + correlationData.getId());

                }

            }

            catch (InterruptedException e) {

                Thread.currentThread().interrupt();

                System.out.println("Interrupted");

            }

            catch (ExecutionException e) {

                System.out.println("Failed to get an ack " + e.getCause().getMessage());


            }

            catch (TimeoutException e) {

                System.out.println("Timed out waiting for ack for " + correlationData.getId());

            }

            correlationData = this.futures.poll();

        }

        System.out.println("No pending acks, exiting");

    }


}


查看完整回答
反对 回复 2022-06-23
?
紫衣仙女

TA贡献1839条经验 获得超15个赞

您可以将连接声明为 bean


@Bean

public ConnectionFactory createConnectionFactory(){

    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1", 5672);

    connectionFactory.setUsername("guest");

    connectionFactory.setPassword("guest");

    connectionFactory.setVirtualHost("/");

    connectionFactory.setPublisherReturns(true);

    connectionFactory.setPublisherConfirmType(ConfirmType.SIMPLE);

    return connectionFactory;

}

然后 RabbitTemplate 为


@Bean

public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {

    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

    rabbitTemplate.setMandatory(true);

    rabbitTemplate.setConfirmCallback(callback);

    return rabbitTemplate;

}

其中 callback 是 ConfirmCallback 接口的实现


在发送时您可以等待确认


System.out.println("Sending message...");

        rabbitTemplate.convertAndSend(rabbitMQProperties.getEXCHANGENAME(), 

                rabbitMQProperties.getQUEUENAME(), "hello from rabbit");

        rabbitTemplate.waitForConfirms(1);

waitforconfirms 将以毫秒为单位。我把它作为 1 用于测试目的。


查看完整回答
反对 回复 2022-06-23
  • 2 回答
  • 0 关注
  • 165 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信