超30min订单自动取消,RabbitMQ做延迟队列,下单成功把订单号推入RabbitMQ,超30min订单自动进入DLX死信队列,消费端监听死信队列得到超时订单,订单状态置为超时,这个已完成,目前卡在30min内支付(不超时)时如何取消相应的订单?即如何在RabbitMQ中删除指定的消息?消息确认后会清除,怎样确认指定的消息?不知道我的思路有问题还是怎样,目前卡在这了,请各位大神指点一下相关代码下单@Override@Transactional(isolation=Isolation.READ_COMMITTED,rollbackFor=Exception.class)publicResultorder(LongproductId,LonguserId){//redis中商品名StringproductNameKey="PRODUCT_NAME_"+productId;//redis中商品库存StringproductNumberKey="PRODUCT_"+productId;//redis中已抢购成功的用户IDStringuserIdKey="USER_"+productId;StringproductName=redisUtil.get(productNameKey,0);//缓存中有商品if(NumberUtils.toInt(redisUtil.get(productNumberKey,0))>0){//当前用户是否已抢购成功if(!redisUtil.sismember(userIdKey,userId+"")){//减缓存redisUtil.decr(productNumberKey);//减库存booleanflag=productMapper.reduceRepertory(productId)==1;if(flag){//生成订单Orderorder=newOrder();//订单号StringorderNo="ORDER_"+DATE_TIME_FORMATTER.format(LocalDateTime.now())+"_"+productId+"_"+userId;order.setOrderNo(orderNo);order.setOrderPrice(newBigDecimal(100));order.setOrderStatus(1);order.setUserId(userId);save(order);//建立订单商品关系ProductOrderproductOrder=newProductOrder();productOrder.setOrderId(order.getId());productOrder.setProductId(productId);productOrderMapper.insert(productOrder);//缓存userIdredisUtil.sadd(userIdKey,userId+"");//订单号orderNo加入MQ,30分钟未支付加入死信队列messageProducer.sendMessage(orderNo,30*60*1000);returnResult.success("下单成功,订单号"+order.getId());}else{Productproduct=productMapper.selectOne(newQueryWrapper().eq("id",productId));//减库存失败,重置缓存redisUtil.set(productNumberKey,JacksonUtil.toJson(product.getProductNumber()),0);log.error("用户{}下单失败,商品名:{}",userId,productName);returnResult.error("202","用户"+userId+"下单失败,商品名"+productName);}}else{//该用户已抢到商品log.error("您已抢到商品{},订单已生成,请去收银台支付",productName);returnResult.error("203","您已抢到商品"+productName+",订单已生成,请去收银台支付");}}else{//商品售罄log.error("商品{}已被抢光",productName);returnResult.error("201","商品"+productName+"已被抢光");}}支付,这里如何删除RabbitMQ中指定的消息(下单时把订单都投入RabbitMQ里了)@OverridepublicResultpay(StringorderNo,LonguserId){Orderorder=orderMapper.selectOne(newQueryWrapper().eq("order_no",orderNo).eq("user_id",userId));order.setOrderStatus(2);//已支付orderMapper.updateById(order);//MQ中删除支付成功的订单TODOreturnResult.success("订单"+orderNo+"支付成功");}发消息@Component@Slf4jpublicclassMessageProducer{@ResourceprivateRabbitTemplaterabbitTemplate;/***发送消息**@parammessage消息*@paramttl有效时间*/publicvoidsendMessage(Stringmessage,intttl){CorrelationDatacorrelationId=newCorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TTL,RabbitConfig.ROUTINGKEY_TTL,message,message1->{MessagePropertiesmessageProperties=message1.getMessageProperties();messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//消息持久化messageProperties.setExpiration(ttl+"");//消息TTL,单位毫秒returnmessage1;},correlationId);log.info("发送消息:{},有效时间:{}秒,回调ID:{},当前时间:{}",message,ttl/1000,correlationId,LocalDateTime.now().toString());}}rabbit配置,声明TTL交换器及对应队列,声明AE备份交换器及对应队列绑定到TTL交换器上,保证消息路由失败不丢失,声明DLX死信交换器及队列绑定到TTL交换器上,实现延迟效果@Configuration@Slf4jpublicclassRabbitConfig{@Value("${spring.rabbitmq.addresses}")privateStringaddresses;@Value("${spring.rabbitmq.username}")privateStringusername;@Value("${spring.rabbitmq.password}")privateStringpassword;@Value("${spring.rabbitmq.virtual-host}")privateStringvirtualHost;@Value("${spring.rabbitmq.publisher-confirms}")privateBooleanpublisherConfirms;@Value("${spring.rabbitmq.publisher-returns}")privateBooleanpublisherReturns;/***备份交换器*/publicstaticfinalStringEXCHANGE_AE="EXCHANGE_AE";/***TTL交换器*/publicstaticfinalStringEXCHANGE_TTL="EXCHANGE_TTL";/***死信交换器*/publicstaticfinalStringEXCHANGE_DLX="EXCHANGE_DLX";/***备份队列*/publicstaticfinalStringQUEUE_AE="QUEUE_AE";/***TTL队列*/publicstaticfinalStringQUEUE_TTL="QUEUE_TTL";/***死信队列*/publicstaticfinalStringQUEUE_DLX="QUEUE_DLX";/***TTLRouting_Key*特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)*/publicstaticfinalStringROUTINGKEY_TTL="routing.ttl.#";/***死信Routing_Key*/publicstaticfinalStringROUTINGKEY_DLX="routing.dlx";@ResourceprivatePublishConfirmpublishConfirm;@ResourceprivatePublishReturnCallBackpublishReturnCallBack;@BeanpublicConnectionFactoryconnectionFactory(){CachingConnectionFactoryconnectionFactory=newCachingConnectionFactory();connectionFactory.setAddresses(addresses);connectionFactory.setVirtualHost(virtualHost);connectionFactory.setUsername(username);connectionFactory.setPassword(password);//消息发送到RabbitMQ交换器后接收ack回调connectionFactory.setPublisherConfirms(publisherConfirms);//消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调connectionFactory.setPublisherReturns(publisherReturns);returnconnectionFactory;}@Bean//@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)//必须是prototype类型publicRabbitTemplaterabbitTemplate(){RabbitTemplaterabbitTemplate=newRabbitTemplate(connectionFactory());rabbitTemplate.setConfirmCallback(publishConfirm);rabbitTemplate.setReturnCallback(publishReturnCallBack);returnrabbitTemplate;}/***备份队列常用FanoutExchange*/@BeanpublicFanoutExchangealternateExchange(){returnnewFanoutExchange(EXCHANGE_AE);}/***TTL队列,指定AE*/@BeanpublicTopicExchangettlExchange(){Mapargs=newHashMap();args.put("alternate-exchange",EXCHANGE_AE);returnnewTopicExchange(EXCHANGE_TTL,true,false,args);}/***死信交换器*/@BeanpublicDirectExchangedeathLetterExchange(){returnnewDirectExchange(EXCHANGE_DLX);}/***备份队列*/@BeanpublicQueuequeueAE(){//returnnewQueue(QUEUE_AE,true);//队列持久returnQueueBuilder.durable(QUEUE_AE).build();}/***TTL队列,指定DLX*/@BeanpublicQueuequeueTTL(){Mapargs=newHashMap();//x-dead-letter-exchange声明了队列里的死信转发到的DLX名称,args.put("x-dead-letter-exchange",EXCHANGE_DLX);//x-dead-letter-routing-key声明了这些死信在转发时携带的routing-key名称。args.put("x-dead-letter-routing-key",ROUTINGKEY_DLX);//returnnewQueue(QUEUE_TTL,true,false,false,args);returnQueueBuilder.durable(QUEUE_TTL).withArguments(args).build();}/***死信队列*/@BeanpublicQueuequeueDLX(){//returnnewQueue(QUEUE_DLX,true);//队列持久returnQueueBuilder.durable(QUEUE_DLX).build();}/***AE绑定队列*/@BeanpublicBindingbindingAE(){returnBindingBuilder.bind(queueAE()).to(alternateExchange());}/***TTL绑定队列*/@BeanpublicBindingbindingTTL(){returnBindingBuilder.bind(queueTTL()).to(ttlExchange()).with(ROUTINGKEY_TTL);}/***DLX绑定队列*/@BeanpublicBindingbindingDLX(){returnBindingBuilder.bind(queueDLX()).to(deathLetterExchange()).with(ROUTINGKEY_DLX);}监听死信队列,获取超时订单(未实现,目前只是demo)@RabbitHandler//声明接收方法@RabbitListener(queues={RabbitConfig.QUEUE_DLX})publicvoidprocessDLX(Messagemessage,Channelchannel){Stringpayload=newString(message.getBody());log.info("接收处理DLX队列当中的消息:{},当前时间:{}",payload,LocalDateTime.now().toString());try{//TODO通知MQ消息已被成功消费,可以ACK了channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(IOExceptione){//TODO如果报错了,那么我们可以进行容错处理,比如转移当前消息进入其它队列}}另外,百度上找到的一个项目RabbitTemplate有这样的配置(RabbitTemplate必须是prototype类型),其他人项目并没有这个配置,想知道到底怎样才算对,小白一个,望大神多多指点@Bean//@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)//必须是prototype类型publicRabbitTemplaterabbitTemplate(){RabbitTemplaterabbitTemplate=newRabbitTemplate(connectionFactory());rabbitTemplate.setConfirmCallback(publishConfirm);rabbitTemplate.setReturnCallback(publishReturnCallBack);returnrabbitTemplate;}
添加回答
举报
0/150
提交
取消