Spring整合JMS-基于activeMQ实现(二)

it2025-06-21  14

Spring整合JMS-基于activeMQ实现(二) 1、消息监听器      在Spring整合JMS的应用中我们在定义消息监听器的时候一共能够定义三种类型的消息监听器,各自是 MessageListenerSessionAwareMessageListenerMessageListenerAdapter 1.1 MessageListener      MessageListener是 最原始的消息监听器(javax.jms.MessageListener),它是JMS规范中定义的一个接口。定义了一个omMessage方法。仅仅接收一个message參数       public  class  ConsumerMessageListener  implements  MessageListener{      public  void  onMessage(Message message) {          //这里我们知道生产者发送的就是一个纯文本消息。所以这里能够直接进行强制转换,或者直接把onMessage方法的參数改成Message的子类TextMessage         TextMessage textMessage = (TextMessage)message;         System.  out .println(  "接收到一个纯文本消息"  );          try  {             System.  out .println(  "消息内容是:"  + textMessage.getText());         }  catch  (JMSException e) {             e.printStackTrace();         }     } } 1.2 SessionAwareMessageListener      sessionAwareMessageListener 是Spring为我们提供的,它不是标准的JMS消息监听器。MessageListener处理接收到的消息时候假设须要返回消息给对方。此时就须要又一次获取connection和session,SessionAwareMessageListener的设计就是为了 方便我们在接收到消息后发送一个回复的消息,onMessage方法中接收两个參数,一个Message。一个发送消息的Session。(看红色新增部分)       < beans  xmlns =  "http://www.springframework.org/schema/beans"  xmlns:aop =  "http://www.springframework.org/schema/aop"         xmlns:tx =  "http://www.springframework.org/schema/tx"  xmlns:xsi =  "http://www.w3.org/2001/XMLSchema-instance"         xmlns:context =  "http://www.springframework.org/schema/context"  xmlns:jms =  "http://www.springframework.org/schema/jms"         xsi:schemaLocation =  "     http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd     http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd     http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd     http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd     http://www.springframework.org/schema/tx  http://www.springframework.org/schema/tx/spring-tx-3.1.xsd " >        < bean  id  = "connectionFactory"  class =  "org.springframework.jms.connection.CachingConnectionFactory"  >               < property  name  = "targetConnectionFactory" >                     < bean  class =  "org.apache.activemq.ActiveMQConnectionFactory"  >                           < property  name  = "brokerURL" >                                 < value  > tcp://localhost:61616  </ value  >                           </ property  >                     </ bean  >               </ property  >               < property  name  = "sessionCacheSize"  value =  "1"  />        </ bean  >        <!-- Spring jmsTemplate queue -->        < bean  id  = "jmsTemplate"  class =  "org.springframework.jms.core.JmsTemplate"  >               < property  name  = "connectionFactory"  ref =  "connectionFactory" ></ property  >               < property  name  = "defaultDestinationName"  value =  "subject" ></ property  >               < property  name  = "deliveryPersistent"  value =  "true" ></ property  >               < property  name  = "pubSubDomain"  value = "false" ></  property >  <!-- false p2p,true topic -->               < property  name  = "sessionAcknowledgeMode"  value =  "1" ></ property  >               < property  name  = "explicitQosEnabled"  value =  "true" ></ property  >               < property  name  = "timeToLive"  value = "604800000" ></  property >        </ bean  >        <!-- 配置Queue,当中value为Queue名称->start -->        < bean  id  =  "testQueue"  class  =  "org.apache.activemq.command.ActiveMQQueue"  >               < constructor-arg  index  =  "0"  value  = "${pur.test.add}"  />        </ bean  >              <bean id = "sessionAwareQueue" class = "org.apache.activemq.command.ActiveMQQueue" >               <constructor-arg index = "0" value= "queue.liupeng.sessionaware" />       </bean >   <!-- 配置Queue,当中value为Queue名称->end -->                     <!-- 注入AMQ的实现类属性(JmsTemplate和Destination) -->         < bean  id  =  "amqQueueSender"  class  =  "com.tuniu.scc.purchase.plan.manage.core.amq.AMQQueueSender"  >              < property  name  =  "jmsTemplate"  ref = "jmsTemplate"  ></ property  >              < property  name  =  "testQueue"  ref = "testQueue"  ></ property  >              <property name = "sessionAwareQueue" ref= "sessionAwareQueue"></property >         </ bean  >                <!-- 消息发送必用的发送类 -->         < bean  id  =  "multiThreadAMQSender"  class  = "com.tuniu.scc.purchase.plan.manage.core.amq.MultiThreadAMQSender"               init-method =  "init" >               < property  name  =  "jmsTemplate"  ref = "jmsTemplate"  ></ property  >               < property  name  =  "multiThreadAMQExecutor"  ref =  "multiThreadAMQExecutor"  ></ property  >        </ bean  >             <!-- 消息监听器->start -->               < bean  id  =  "consumerMessageListener"  class =  "com.tuniu.scc.purchase.plan.manage.core.amq.ConsumerMessageListener"  />            <!-- 消息监听容器 -->            < bean  id  =  "jmsContainer"  class =  "org.springframework.jms.listener.DefaultMessageListenerContainer"  >                < property  name  =  "connectionFactory"  ref =  "connectionFactory"  />                   < property  name  =  "destination"  ref =  "testQueue"  />  <!-- 消费者队列名称,改动 -->                < property  name  =  "messageListener"  ref =  "consumerMessageListener"  />            </ bean  >                     < bean  id  =  "consumerSessionAwareMessageListener"  class = "com.tuniu.scc.purchase.plan.manage.core.amq.ConsumerSessionAwareMessageListener"  >              <property name ="testQueue" ref="testQueue"/>  <!-- 接收消息后返回给testQueue队列 -->             </ bean  >              <  bean  id =  "sessionAwareListenerContainer"  class =  "org.springframework.jms.listener.DefaultMessageListenerContainer"  >                  < property  name  = "connectionFactory"  ref =  "connectionFactory"  />                  < property  name  = "destination"  ref = "sessionAwareQueue"  />                  < property  name  = "messageListener"  ref =  "consumerSessionAwareMessageListener"  />                </ bean  >     <!-- 消息监听器->end -->       </ beans >       发送消息:                    @Resource      private  AMQQueueSender  amqQueueSender ;      private  static  final  Logger  LOG  = LoggerFactory.getLogger(AMQController.  class );      @UvConfig (method =  "testQueue" , description =  "測试AMQ" )      @RequestMapping (value =  "/testQueue" , method = RequestMethod.  POST )      @TSPServiceInfo (name =  "PUR.NM. AMQController .testQueue"  , description =  "測试AMQ" )      public  void  testQueue(HttpServletRequest request, HttpServletResponse response) {          try  {              long  beginTime = System. currentTimeMillis();              LOG .info(  "发送開始" );              //amqQueueSender.sendMessage("test", StaticProperty.TEST_QUEUE);              amqQueueSender .sendMessage(  "test" , StaticProperty. TEST_SESSIONAWARE_QUEUE  );              LOG .info(  "发送结束,耗时:"  +(System.currentTimeMillis()-beginTime)+  "ms" );         }  catch  (InterruptedException e) {              LOG .error(  "測试失败" , e);         }     }       接收消息:            public  class  ConsumerSessionAwareMessageListener  implements  SessionAwareMessageListener<TextMessage>{      private  Destination  testQueue ;  //返回消息目的队列      @Override      public  void  onMessage(TextMessage message, Session session)  throws  JMSException {         System.  out .println(  "收到一条消息"  );         System.  out .println(  "消息内容是:"  +message.getText());                 MessageProducer producer = session.createProducer(  testQueue );         Message txtMessage = session.createTextMessage( "consumerSessionAwareMessageListener..."  );         producer.send(txtMessage);     }      public  Destination getTestQueue() {          return  testQueue ;     }      public  void  setTestQueue(Destination sessionAwareQueue) {          this . testQueue  = sessionAwareQueue;     } }       打印结果: 收到一条消息 消息内容是:test 接收到一个纯文本消息 消息内容是:consumerSessionAwareMessageListener... 1.3 MessageListenerAdapter      MessageListenerAdapter 实现了MessageListener接口和SessionAwareMessageListener接口,它的作用主要是将接收到的消息进行类型转换。然后通过反射的形式把它交给一个普通的Java类进行处理。      MessageListenerAdapter会把接收到的消息做例如以下转换:           TextMessage转换为String对象           BytesMessage转换为byte数组           MapMessage转换为Map对象           ObjectMessage转换为相应的Serializable对象      既然前面说到MessageListenerAdapter会把接收到的消息做类型转换再通过反射交给Java类处理,假设真正目标处理器是一个MessageListener或者是一个SessionAwareMessageListener,那么Spring将直接使用接收到的Message对象作为參数调用它们的onMessage方法。而不会利用反射去调用。以下定义的时候为它指定一个目标类                <!-- 消息监听适配器 -->           < bean  id  = "messageListenerAdapter"  class =  "org.springframework.jms.listener.adapter.MessageListenerAdapter"  >                   < property  name  = "delegate" >                      < bean  class =  "com.tuniu.scc.purchase.plan.manage.core.amq.ConsumerListener"  />                  </ property  >                  < property  name  = "defaultListenerMethod"  value =  "receiveMessage" />            </ bean  >            < bean  id  = "messageListenerAdapterContainer"  class =  "org.springframework.jms.listener.DefaultMessageListenerContainer"  >                < property  name  = "connectionFactory"  ref =  "connectionFactory"  />                < property  name  = "destination"  ref = "adapterQueue"  />                < property  name  = "messageListener"  ref =  "messageListenerAdapter"  />                </ bean  >         上面说到,目标处理器是一个普通Java类的时候,Spring将进行类型转换之后的对象通过反射去调用真正的方法,那么Spring是怎样知道该调用哪个方法的呢?这是通过MessageListenerAdapter的defaultListenerMethod属性来决定的。 当没有指定该属性的时候。会默认调用目标处理器的handleMessage方法       编写新队列、发送消息同上       接收消息:            public  class  ConsumerListener{      public  void  handleMessage(String message) {           System.  out .println(  "ConsumerListener通过handleMessage接收到一个纯文本消息。消息内容是:"  + message);       }               public  void  receiveMessage(String message) {           System. out .println( "ConsumerListener通过receiveMessage接收到一个纯文本消息。消息内容是:"  + message);       }   }       MessageListenerAdapter的另外一个主要功能就是能够自己主动的发送回复的消息 方法一:、public void sendMessage(Destination destination, final String message) {           System.out.println("---------------生产者发送消息-----------------");           System.out.println("---------------生产者发了一个消息:" + message);           jmsTemplate.send(destination, new MessageCreator() {               public Message createMessage(Session session) throws JMSException {                   TextMessage textMessage = session.createTextMessage(message);                   textMessage.setJMSReplyTo(responseDestination);  //(省略编写其相应的监听器代码)                 return textMessage;               }           });       }         方法二:            <!-- 消息监听适配器 -->           < bean  id  = "messageListenerAdapter"  class =  "org.springframework.jms.listener.adapter.MessageListenerAdapter"  >                   < property  name  = "delegate" >                      < bean  class =  "com.tuniu.scc.purchase.plan.manage.core.amq.ConsumerListener"  />                  </ property  >                  < property  name  = "defaultListenerMethod"  value =  "receiveMessage" />            </ bean  >            < bean  id  = "messageListenerAdapterContainer"  class =  "org.springframework.jms.listener.DefaultMessageListenerContainer"  >                < property  name  = "connectionFactory"  ref =  "connectionFactory"  />                < property  name  = "destination"  ref = "adapterQueue"  />                < property  name  = "messageListener"  ref =  "messageListenerAdapter"  />                          <property name="defaultResponseDestination" ref="defaultResponseQueue"/>                </ bean  >

转载于:https://www.cnblogs.com/bhlsheji/p/5103408.html

相关资源:spring整合JMS实现同步收发消息(基于ActiveMQ的实现)
最新回复(0)