Running ActiveMQ using Spring
Apache ActiveMQ is an open source messaging framework. The ActiveMQ web site is not really clear on how to integrate it with the Spring framework. Therefore, I decided to write this post to explain how to use ActiveMQ in combination with Spring and clarify some points.
The good news is that you can run JMS inside a servlet container (e.g. Apache Tomcat) without the need for a JCA adapter. This means you do not need Jencks or something similar.
This is a repost of a blog item that was originally posted in the Func knowledge base by Diego Castorina on January 29, 2010.
To get started, you can embed the configuration of a complete running environment in a Spring configuration file. Here is the example we’ll use for the remainder of this blog post:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-2.5.xsd"> <amq:broker brokername="test-broker" start="true"> <amq:persistenceAdapter> <amq:amqPersistenceAdapter directory="/opt/activemq" maxFileLength="32mb"/> </amq:persistenceAdapter> <amq:transportconnectors> <amq:transportconnector uri="tcp://localhost:7171"/> </amq:transportconnectors> </amq:broker> <amq:connectionFactory id="amqConnectionFactory" brokerURL="vm://test-broker"/> <bean class="org.springframework.jms.connection.CachingConnectionFactory" id="connectionFactory"> <constructor-arg ref="amqConnectionFactory"/> <property name="sessionCacheSize" value="100"/> </bean> <amq:queue physicalName="testQueue" /> <bean class="org.springframework.jms.core.JmsTemplate" id="jmsTemplate"> <constructor-arg ref="connectionFactory"/> </bean> <bean class="packageName.ClassName" id="queueProducer"> <property name="jmsTemplate" ref="jmsTemplate"/> <property name="queueName" value="testQueue"/> </bean> <bean class="packageName.ClassName2" id="queueListener"/> <jms:listener-container concurrency="10" connectionfactory="connectionFactory"> <jms:listener destination="testQueue" ref="queueListener"/> </jms:listener-container> </beans>
That’s a lot of configuration, but let’s analyze each one:
<amq:broker brokername="test-broker" start="true"> <amq:persistenceAdapter> <amq:amqPersistenceAdapter directory="/opt/activemq" maxFileLength="32mb"/> </amq:persistenceAdapter> <amq:transportconnectors> <amq:transportconnector uri="tcp://localhost:7171"/> </amq:transportconnectors> </amq:broker>
This is the definition for an embedded JMS broker, called test-broker
which listens on port 7171 using the tcp protocol and which persist data using the default AMQ Message Store in the /opt/activemq
directory.
<amq:connectionFactory id="amqConnectionFactory" brokerURL="vm://test-broker"/>
The definition of the JMS connection factory which connects to the broker using a VM transport. In this way the communication is made at the JVM level, thus avoiding network overhead.
<amq:queue id="testQueue" physicalname="TestQueue">
Defines the queue that we are going to use.
<bean class="org.springframework.jms.connection.CachingConnectionFactory" id="connectionFactory"> <constructor-arg ref="amqConnectionFactory"/> <property name="sessionCacheSize" value="100"/> </bean>
This is the connection factory we are really going to use in our application. It caches connections, sessions and even the MessageProducer
. It is important to set the value of the sessionCacheSize
property since the default value is 1.
<bean class="org.springframework.jms.core.JmsTemplate" id="jmsTemplate"> <constructor-arg ref="connectionFactory"/> </bean>
For those who know the Spring support for JDBC and Hibernate, the JmsTemplate
class should sound familiar since it has a very similar design to the JdbcTemplate
and HibernateTemplate
class.
It hides most of the JMS-related low-level details (e.g. obtaining a session or handling acknowledgments). By default session are not transactional and auto-acknowledged.
The connectionFactory
is passed as a property to the constructor.
One of its most important method on the JmsTemplate
is send(String destinationName, MessageCreator messageCreator)
. The MessageCreator
is an interface defined by Spring having only the createMessage(Session session)
method. Its output is the JMS message that will be sent to destinationName
.
<bean class="packageName.ClassName" id="queueProducer"> <property name="jmsTemplate" ref="jmsTemplate"></property> <property name="queueName" value="testQueue"></property> </bean>
This, finally is an object that is defined within our application, the object that produces the message queue and sends messages to it through the specified JmsTemplate
.
<bean class="packageName.ClassName2" id="queueListener"/>
The bean definition that makes up the consumer of our message queue. It needs to implement the javax.jms.MessageListener
interface (or some Spring specific interface like SessionAwareMessageListener
).
In Spring terms it is a Message Driven Pojo, the difference with a standard Message Driven Bean is that it does not need to run in an EJB container.
<jms:listener-container concurrency="1" connection-factory="connectionFactory"> <jms:listener destination="AggregateQueue" ref="queueListener"></jms:listener> </jms:listener-container>
This is the bean who makes the magic happen: it registers itself as a listener for all the queues and executes the callback on the listener as a message is received in an asynchronous way using a Spring TaskExecutor
. The concurrency attribute defines how many MessageConsumer
instances will be created for each listener. If you use a value > 1 remember to set the prefetchLimit
to 1 in order to avoid undelivered messages to the consumers.
Have fun 🙂