Spring整合JMS-基于activeMQ实现(二)
1、消息监听器
在Spring整合JMS的应用中我们在定义消息监听器的时候一共能够定义三种类型的消息监听器,各自是
MessageListener、
SessionAwareMessageListener、
MessageListenerAdapter
1.1 MessageListener
MessageListener是
最原始的消息监听器(javax.jms.MessageListener),它是JMS规范中定义的一个接口。定义了一个omMessage方法。仅仅接收一个message參数
public
class
ConsumerMessageListener
implements
MessageListener{
public
void
onMessage(Message message) {
//这里我们知道生产者发送的就是一个纯文本消息。所以这里能够直接进行强制转换,或者直接把onMessage方法的參数改成Message的子类TextMessage
TextMessage textMessage = (TextMessage)message;
System.
out
.println(
"接收到一个纯文本消息"
);
try
{
System.
out
.println(
"消息内容是:"
+ textMessage.getText());
}
catch
(JMSException e) {
e.printStackTrace();
}
}
}
1.2 SessionAwareMessageListener
sessionAwareMessageListener
是Spring为我们提供的,它不是标准的JMS消息监听器。MessageListener处理接收到的消息时候假设须要返回消息给对方。此时就须要又一次获取connection和session,SessionAwareMessageListener的设计就是为了
方便我们在接收到消息后发送一个回复的消息,onMessage方法中接收两个參数,一个Message。一个发送消息的Session。(看红色新增部分)
<
beans
xmlns
=
"http://www.springframework.org/schema/beans"
xmlns:aop
=
"http://www.springframework.org/schema/aop"
xmlns:tx
=
"http://www.springframework.org/schema/tx"
xmlns:xsi
=
"http://www.w3.org/2001/XMLSchema-instance"
xmlns:context
=
"http://www.springframework.org/schema/context"
xmlns:jms
=
"http://www.springframework.org/schema/jms"
xsi:schemaLocation
=
"
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd "
>
<
bean
id
=
"connectionFactory"
class
=
"org.springframework.jms.connection.CachingConnectionFactory"
>
<
property
name
=
"targetConnectionFactory"
>
<
bean
class
=
"org.apache.activemq.ActiveMQConnectionFactory"
>
<
property
name
=
"brokerURL"
>
<
value
>
tcp://localhost:61616
</
value
>
</
property
>
</
bean
>
</
property
>
<
property
name
=
"sessionCacheSize"
value
=
"1"
/>
</
bean
>
<!-- Spring jmsTemplate queue -->
<
bean
id
=
"jmsTemplate"
class
=
"org.springframework.jms.core.JmsTemplate"
>
<
property
name
=
"connectionFactory"
ref
=
"connectionFactory"
></
property
>
<
property
name
=
"defaultDestinationName"
value
=
"subject"
></
property
>
<
property
name
=
"deliveryPersistent"
value
=
"true"
></
property
>
<
property
name
=
"pubSubDomain"
value
=
"false"
></
property
>
<!-- false p2p,true topic -->
<
property
name
=
"sessionAcknowledgeMode"
value
=
"1"
></
property
>
<
property
name
=
"explicitQosEnabled"
value
=
"true"
></
property
>
<
property
name
=
"timeToLive"
value
=
"604800000"
></
property
>
</
bean
>
<!-- 配置Queue,当中value为Queue名称->start -->
<
bean
id
=
"testQueue"
class
=
"org.apache.activemq.command.ActiveMQQueue"
>
<
constructor-arg
index
=
"0"
value
=
"${pur.test.add}"
/>
</
bean
>
<bean id = "sessionAwareQueue" class = "org.apache.activemq.command.ActiveMQQueue" >
<constructor-arg index = "0" value= "queue.liupeng.sessionaware" />
</bean >
<!-- 配置Queue,当中value为Queue名称->end -->
<!-- 注入AMQ的实现类属性(JmsTemplate和Destination) -->
<
bean
id
=
"amqQueueSender"
class
=
"com.tuniu.scc.purchase.plan.manage.core.amq.AMQQueueSender"
>
<
property
name
=
"jmsTemplate"
ref
=
"jmsTemplate"
></
property
>
<
property
name
=
"testQueue"
ref
=
"testQueue"
></
property
>
<property name = "sessionAwareQueue" ref= "sessionAwareQueue"></property >
</
bean
>
<!-- 消息发送必用的发送类 -->
<
bean
id
=
"multiThreadAMQSender"
class
=
"com.tuniu.scc.purchase.plan.manage.core.amq.MultiThreadAMQSender"
init-method
=
"init"
>
<
property
name
=
"jmsTemplate"
ref
=
"jmsTemplate"
></
property
>
<
property
name
=
"multiThreadAMQExecutor"
ref
=
"multiThreadAMQExecutor"
></
property
>
</
bean
>
<!-- 消息监听器->start -->
<
bean
id
=
"consumerMessageListener"
class
=
"com.tuniu.scc.purchase.plan.manage.core.amq.ConsumerMessageListener"
/>
<!-- 消息监听容器 -->
<
bean
id
=
"jmsContainer"
class
=
"org.springframework.jms.listener.DefaultMessageListenerContainer"
>
<
property
name
=
"connectionFactory"
ref
=
"connectionFactory"
/>
<
property
name
=
"destination"
ref
=
"testQueue"
/>
<!-- 消费者队列名称,改动 -->
<
property
name
=
"messageListener"
ref
=
"consumerMessageListener"
/>
</
bean
>
<
bean
id
=
"consumerSessionAwareMessageListener"
class =
"com.tuniu.scc.purchase.plan.manage.core.amq.ConsumerSessionAwareMessageListener"
>
<property name ="testQueue" ref="testQueue"/>
<!-- 接收消息后返回给testQueue队列 -->
</
bean
>
<
bean
id
=
"sessionAwareListenerContainer"
class
=
"org.springframework.jms.listener.DefaultMessageListenerContainer"
>
<
property
name
=
"connectionFactory"
ref
=
"connectionFactory"
/>
<
property
name
=
"destination"
ref
=
"sessionAwareQueue"
/>
<
property
name
=
"messageListener"
ref
=
"consumerSessionAwareMessageListener"
/>
</
bean
>
<!-- 消息监听器->end -->
</
beans
>
发送消息:
@Resource
private
AMQQueueSender
amqQueueSender
;
private
static
final
Logger
LOG
= LoggerFactory.getLogger(AMQController.
class
);
@UvConfig
(method =
"testQueue"
, description =
"測试AMQ"
)
@RequestMapping
(value =
"/testQueue"
, method = RequestMethod.
POST
)
@TSPServiceInfo
(name =
"PUR.NM.
AMQController
.testQueue"
, description =
"測试AMQ"
)
public
void
testQueue(HttpServletRequest request, HttpServletResponse response) {
try
{
long
beginTime = System. currentTimeMillis();
LOG
.info(
"发送開始"
);
//amqQueueSender.sendMessage("test", StaticProperty.TEST_QUEUE);
amqQueueSender
.sendMessage(
"test"
, StaticProperty.
TEST_SESSIONAWARE_QUEUE
);
LOG
.info(
"发送结束,耗时:"
+(System.currentTimeMillis()-beginTime)+
"ms"
);
}
catch
(InterruptedException e) {
LOG
.error(
"測试失败"
, e);
}
}
接收消息:
public
class
ConsumerSessionAwareMessageListener
implements
SessionAwareMessageListener<TextMessage>{
private
Destination
testQueue
;
//返回消息目的队列
@Override
public
void
onMessage(TextMessage message, Session session)
throws
JMSException {
System.
out
.println(
"收到一条消息"
);
System.
out
.println(
"消息内容是:"
+message.getText());
MessageProducer producer = session.createProducer(
testQueue
);
Message txtMessage = session.createTextMessage(
"consumerSessionAwareMessageListener..."
);
producer.send(txtMessage);
}
public
Destination getTestQueue() {
return
testQueue
;
}
public
void
setTestQueue(Destination sessionAwareQueue) {
this
.
testQueue
= sessionAwareQueue;
}
}
打印结果:
收到一条消息
消息内容是:test
接收到一个纯文本消息
消息内容是:consumerSessionAwareMessageListener...
1.3 MessageListenerAdapter
MessageListenerAdapter
实现了MessageListener接口和SessionAwareMessageListener接口,它的作用主要是将接收到的消息进行类型转换。然后通过反射的形式把它交给一个普通的Java类进行处理。
MessageListenerAdapter会把接收到的消息做例如以下转换:
TextMessage转换为String对象
BytesMessage转换为byte数组
MapMessage转换为Map对象
ObjectMessage转换为相应的Serializable对象
既然前面说到MessageListenerAdapter会把接收到的消息做类型转换再通过反射交给Java类处理,假设真正目标处理器是一个MessageListener或者是一个SessionAwareMessageListener,那么Spring将直接使用接收到的Message对象作为參数调用它们的onMessage方法。而不会利用反射去调用。以下定义的时候为它指定一个目标类
<!-- 消息监听适配器 -->
<
bean
id
=
"messageListenerAdapter"
class
=
"org.springframework.jms.listener.adapter.MessageListenerAdapter"
>
<
property
name
=
"delegate"
>
<
bean
class
=
"com.tuniu.scc.purchase.plan.manage.core.amq.ConsumerListener"
/>
</
property
>
<
property
name
=
"defaultListenerMethod"
value
=
"receiveMessage"
/>
</
bean
>
<
bean
id
=
"messageListenerAdapterContainer"
class
=
"org.springframework.jms.listener.DefaultMessageListenerContainer"
>
<
property
name
=
"connectionFactory"
ref
=
"connectionFactory"
/>
<
property
name
=
"destination"
ref
=
"adapterQueue"
/>
<
property
name
=
"messageListener"
ref
=
"messageListenerAdapter"
/>
</
bean
>
上面说到,目标处理器是一个普通Java类的时候,Spring将进行类型转换之后的对象通过反射去调用真正的方法,那么Spring是怎样知道该调用哪个方法的呢?这是通过MessageListenerAdapter的defaultListenerMethod属性来决定的。
当没有指定该属性的时候。会默认调用目标处理器的handleMessage方法
编写新队列、发送消息同上
接收消息:
public
class
ConsumerListener{
public
void
handleMessage(String message) {
System.
out
.println(
"ConsumerListener通过handleMessage接收到一个纯文本消息。消息内容是:"
+ message);
}
public
void
receiveMessage(String message) {
System.
out
.println(
"ConsumerListener通过receiveMessage接收到一个纯文本消息。消息内容是:"
+ message);
}
}
MessageListenerAdapter的另外一个主要功能就是能够自己主动的发送回复的消息
方法一:、public void sendMessage(Destination destination,
final String message) { System.out.println(
"---------------生产者发送消息-----------------"); System.out.println(
"---------------生产者发了一个消息:" + message); jmsTemplate.send(destination,
new MessageCreator() {
public Message createMessage(Session session)
throws JMSException { TextMessage textMessage = session.createTextMessage(message);
textMessage.setJMSReplyTo(responseDestination); //(省略编写其相应的监听器代码)
return textMessage; } }); }
方法二:
<!-- 消息监听适配器 -->
<
bean
id
=
"messageListenerAdapter"
class
=
"org.springframework.jms.listener.adapter.MessageListenerAdapter"
>
<
property
name
=
"delegate"
>
<
bean
class
=
"com.tuniu.scc.purchase.plan.manage.core.amq.ConsumerListener"
/>
</
property
>
<
property
name
=
"defaultListenerMethod"
value
=
"receiveMessage"
/>
</
bean
>
<
bean
id
=
"messageListenerAdapterContainer"
class
=
"org.springframework.jms.listener.DefaultMessageListenerContainer"
>
<
property
name
=
"connectionFactory"
ref
=
"connectionFactory"
/>
<
property
name
=
"destination"
ref
=
"adapterQueue"
/>
<
property
name
=
"messageListener"
ref
=
"messageListenerAdapter"
/>
<property name="defaultResponseDestination" ref="defaultResponseQueue"/>
</
bean
>
转载于:https://www.cnblogs.com/bhlsheji/p/5103408.html
相关资源:spring整合JMS实现同步收发消息(基于ActiveMQ的实现)