为了账号安全,请及时绑定邮箱和手机立即绑定

JMS 回调返回重复的 JMSMessageID

JMS 回调返回重复的 JMSMessageID

四季花海 2023-06-04 19:47:12
我目前正在尝试使用纯 java 将一堆简单消息发送到队列。public AtomicReference<Message> doSend(String message, String queue){    try (JMSContext context = connectionFactory.createContext()) {        TextMessage textMessage = context.createTextMessage(message);                    final AtomicReference<Message> msg = new AtomicReference<>();        msg.set(textMessage);        log.info("Sending message to queue {}", queue);        context.createProducer().send(createDestination(context, queue), textMessage);        log.info("Message sent to queue {}, messageId provided {}", queue, msg.get().getJMSMessageID());        return msg;    }    catch (Exception e) {        log.error("Failed to send message to queue",e);        throw new SipJmsException("Failed to send message to queue", e);    }}private Destination createDestination(JMSContext context, String queue){    log.debug("Creating destination queue {} connection",queue);    return context.createQueue(queue);}我连续发送 N 条消息,日志显示 JMSMessageId 始终生成相同。[main] Sending message to queue TEST_QUEUE[main] Message sent to queue TEST_QUEUE, messageId provided ID:414d5120444556494d53514d20202020551c3f5d81619824[main] Sending message to queue TEST_QUEUE[main] Message sent to queue TEST_QUEUE, messageId provided ID:414d5120444556494d53514d20202020551c3f5d83619824ETC。据我所知,JMSMessageId 应该是唯一的,它的冲突会导致问题。O'Reily 的书说:JMSMessageID 是一个唯一标识消息的字符串值。标识符的唯一性取决于供应商。JMSMessageID 对于需要对消息进行唯一索引的 JMS 消费者应用程序中的历史存储库很有用。与 JMSCorrelationID 结合使用,JMSMessageID 也可用于关联消息:String messageid = message.getJMSMessageID();那么,为什么 MessageId 不是唯一的呢?(应用程序运行之间甚至相同)。
查看完整描述

3 回答

?
慕斯王

TA贡献1864条经验 获得超2个赞

消息 ID 是唯一的,我用以下标记了不同的编号*:


414d5120444556494d53514d20202020551c3f5d81619824

                                         *

414d5120444556494d53514d20202020551c3f5d83619824


查看完整回答
反对 回复 2023-06-04
?
撒科打诨

TA贡献1934条经验 获得超2个赞

我创建了一个简单的 JMS 程序,它将 5 条消息放入一个队列,每次放入后,它都会输出 JMSMessageId。


示例输出:


2019/08/13 19:15:18.824 MQTestJMS11x5: testConn: successfully connected.

2019/08/13 19:15:18.845 MQTestJMS11x5: testConn: successfully opened TEST.Q1

2019/08/13 19:15:18.845 MQTestJMS11x5: sendMsg: Sending request to queue:///TEST.Q1

2019/08/13 19:15:18.845 MQTestJMS11x5: sendMsg: 

2019/08/13 19:15:18.887 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201102

2019/08/13 19:15:18.887 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201103

2019/08/13 19:15:18.888 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201104

2019/08/13 19:15:18.889 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201105

2019/08/13 19:15:18.889 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201106

2019/08/13 19:15:18.892 MQTestJMS11x5: testConn: Closed session

2019/08/13 19:15:18.892 MQTestJMS11x5: testConn: Stopped connection

2019/08/13 19:15:18.893 MQTestJMS11x5: testConn: Closed connection

请注意,每个消息 ID 都是唯一的。


这是生成输出的 JMS 程序:


import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.Hashtable;

import javax.jms.*;


import com.ibm.mq.jms.*;

import com.ibm.msg.client.wmq.WMQConstants;


/**

 * Program Name

 *  MQTestJMS11x5

 *

 * Description

 *  This java JMS class will connect to a remote queue manager and put 5 messages to a queue.

 *

 * Sample Command Line Parameters

 *  -m MQA1 -h 127.0.0.1 -p 1414 -c TEST.CHL -q TEST.Q1 -u UserID -x Password

 *

 * @author Roger Lacroix

 */

public class MQTestJMS11x5

{

   private static final SimpleDateFormat  LOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");


   private Hashtable<String,String> params;

   private MQQueueConnectionFactory mqQCF = null;



   /**

    * The constructor

    */

   public MQTestJMS11x5()

   {

      super();

      params = new Hashtable<String,String>();

   }


   /**

    * Make sure the required parameters are present.

    * @return true/false

    */

   private boolean allParamsPresent()

   {

      boolean b = params.containsKey("-h") && params.containsKey("-p") &&

                  params.containsKey("-c") && params.containsKey("-m") &&

                  params.containsKey("-q") &&

                  params.containsKey("-u") && params.containsKey("-x");

      if (b)

      {

         try

         {

            Integer.parseInt((String) params.get("-p"));

         }

         catch (NumberFormatException e)

         {

            b = false;

         }

      }


      return b;

   }


   /**

    * Extract the command-line parameters and initialize the MQ variables.

    * @param args

    * @throws IllegalArgumentException

    */

   private void init(String[] args) throws IllegalArgumentException

   {

      if (args.length > 0 && (args.length % 2) == 0)

      {

         for (int i = 0; i < args.length; i += 2)

         {

            params.put(args[i], args[i + 1]);

         }

      }

      else

      {

         throw new IllegalArgumentException();

      }


      if (allParamsPresent())

      {

         try

         {

            mqQCF = new MQQueueConnectionFactory();

            mqQCF.setQueueManager((String) params.get("-m"));

            mqQCF.setHostName((String) params.get("-h"));

            mqQCF.setChannel((String) params.get("-c"));

            mqQCF.setTransportType(WMQConstants.WMQ_CM_CLIENT);

            try

            {

               mqQCF.setPort(Integer.parseInt((String) params.get("-p")));

            }

            catch (NumberFormatException e)

            {

               mqQCF.setPort(1414);

            }

         }

         catch (JMSException e)

         {

            MQTestJMS11x5.logger("getLinkedException()=" + e.getLinkedException());

            MQTestJMS11x5.logger(e.getLocalizedMessage());

            e.printStackTrace();

            throw new IllegalArgumentException();

         }

         catch (Exception e)

         {

            MQTestJMS11x5.logger(e.getLocalizedMessage());

            e.printStackTrace();

            throw new IllegalArgumentException();

         }

      }

      else

      {

         throw new IllegalArgumentException();

      }

   }


   /**

    * Test the connection to the queue manager.

    * @throws MQException

    */

   private void testConn()

   {

      QueueConnection conn = null;

      QueueSession session = null;

      Queue myQ = null;


      try

      {

         conn = mqQCF.createQueueConnection((String) params.get("-u"), (String) params.get("-x"));

         conn.start();


         session = conn.createQueueSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);

         MQTestJMS11x5.logger("successfully connected.");


         myQ = session.createQueue((String) params.get("-q"));

         MQTestJMS11x5.logger("successfully opened "+ (String) params.get("-q"));


         MQDestination mqd = (MQDestination) myQ;

         mqd.setTargetClient(WMQConstants.WMQ_CLIENT_JMS_COMPLIANT);


         sendMsg( session, myQ);

      }

      catch (JMSException e)

      {

         MQTestJMS11x5.logger("getLinkedException()=" + e.getLinkedException());

         MQTestJMS11x5.logger(e.getLocalizedMessage());

         e.printStackTrace();

      }

      catch (Exception e)

      {

         MQTestJMS11x5.logger(e.getLocalizedMessage());

         e.printStackTrace();

      }

      finally

      {

         try

         {

            if (session != null)

            {

               session.close();

               MQTestJMS11x5.logger("Closed session");

            }

         }

         catch (Exception ex)

         {

            MQTestJMS11x5.logger("session.close() : " + ex.getLocalizedMessage());

         }


         try

         {

            if (conn != null)

            {

               conn.stop();

               MQTestJMS11x5.logger("Stopped connection");

            }

         }

         catch (Exception ex)

         {

            MQTestJMS11x5.logger("connection.stop() : " + ex.getLocalizedMessage());

         }


         try

         {

            if (conn != null)

            {

               conn.close();

               MQTestJMS11x5.logger("Closed connection");

            }

         }

         catch (Exception ex)

         {

            MQTestJMS11x5.logger("connection.close() : " + ex.getLocalizedMessage());

         }

      }

   }


   /**

    * Send a message to a queue.

    * @throws MQException

    */

   private void sendMsg(QueueSession session, Queue myQ) throws JMSException

   {

      QueueSender sender = null;

      TextMessage msg = null;


      try

      {

         MQTestJMS11x5.logger("Sending request to " + myQ.getQueueName());

         MQTestJMS11x5.logger("");


         sender = session.createSender(myQ);


         for (int i=0; i < 5; i++)

         {

            msg = session.createTextMessage();

            msg.setText("This is test message # " + (i+1));


            sender.send(msg);


            MQTestJMS11x5.logger("Sent message: MessageId="+msg.getJMSMessageID());

         }

      }

      finally

      {

         try

         {

            if (sender != null)

               sender.close();

         }

         catch (Exception ex)

         {

            MQTestJMS11x5.logger("sender.close() : " + ex.getLocalizedMessage());

         }

      }

   }


   /**

    * A simple logger method

    * @param data

    */

   public static void logger(String data)

   {

      String className = Thread.currentThread().getStackTrace()[2].getClassName();


      // Remove the package info.

      if ( (className != null) && (className.lastIndexOf('.') != -1) )

         className = className.substring(className.lastIndexOf('.')+1);


      System.out.println(LOGGER_TIMESTAMP.format(new Date())+" "+className+": "+Thread.currentThread().getStackTrace()[2].getMethodName()+": "+data);

   }


   /**

    * mainline

    * @param args

    */

   public static void main(String[] args)

   {

      MQTestJMS11x5 write = new MQTestJMS11x5();


      try

      {

         write.init(args);

         write.testConn();

      }

      catch (IllegalArgumentException e)

      {

         MQTestJMS11x5.logger("Usage: java MQTestJMS11x5 -m QueueManagerName -h host -p port -c channel -q JMS_Queue_Name -u UserID -x Password");

         System.exit(1);

      }

      catch (Exception e)

      {

         MQTestJMS11x5.logger(e.getLocalizedMessage());

         System.exit(1);

      }


      System.exit(0);

   }

}



查看完整回答
反对 回复 2023-06-04
?
狐的传说

TA贡献1804条经验 获得超3个赞

final AtomicReference<Message> msg = new AtomicReference<>();

你为什么使用“最终”。将其删除并重试。


查看完整回答
反对 回复 2023-06-04
  • 3 回答
  • 0 关注
  • 172 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信