This tutorial will show you how we can send a message to Topic using publish/subscribe messaging system in Apache ActiveMQ. For more information on publish/subscribe messaging system please read tutorial https://www.jeejava.com/configure-jms-client-using-glassfish-3/

Before you moving forward please read the tutorial https://www.jeejava.com/apache-activemq-configuration-in-windows/ for configuring ActiveMQ but do not create any Topic.

Now we will look into the following steps in order to implement publish/subscribe messaging system.

1. Create a class called MessagePublisher that will produce message or send message to the destination – Topic.

package com.roytuts.spring.jms.publisher;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;

public class MessagePublisher {

  @Autowired
  private JmsTemplate jmsTemplate;

  public void sendMessage(final String message) {
    jmsTemplate.convertAndSend(message);
  }

}

 2. Create a class called MessageSubscriber1 that will receive message from the destination – Topic.

package com.roytuts.spring.jms.subscriber;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MessageSubscriber1 implements MessageListener {

  @Override
  public void onMessage(Message message) {
    if (message instanceof TextMessage) {
      try {
        String msg = ((TextMessage) message).getText();
        System.out.println("Message consumed by MessageSubscriber1 : " + msg);
      } catch (JMSException ex) {
        throw new RuntimeException(ex);
      }
    } else {
      throw new IllegalArgumentException("Message must be of type TextMessage");
    }
  }

}

3. Create a class called MessageSubscriber2 that will receive message from the destination – Topic.

package com.roytuts.spring.jms.subscriber;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MessageSubscriber2 implements MessageListener {

  @Override
  public void onMessage(Message message) {
    if (message instanceof TextMessage) {
      try {
        String msg = ((TextMessage) message).getText();
        System.out.println("Message consumed by MessageSubscriber2 : " + msg);
      } catch (JMSException ex) {
        throw new RuntimeException(ex);
      }
    } else {
      throw new IllegalArgumentException("Message must be of type TextMessage");
    }
  }

}

 4. Create an XML property file that will hold all key/value pairs for the application. Put this file under src/main/resources

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

	<bean id="jmsSpringProperties"
		class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"
		name="jmsSpringProperties">
		<property name="order" value="99999" />
		<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" />
		<property name="ignoreUnresolvablePlaceholders" value="true" />
		<property name="properties">
			<value>

				<!-- JMS -->
				JMS.BROKER.URL=tcp://localhost:61616
				JMS.TOPIC.NAME=IN_TOPIC

			</value>
		</property>
	</bean>

</beans>

5. Create an XML configuration file that will hold all JMS related configuration for the application. Put this file under src/main/resources/jms

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.2.xsd">

	<!-- Activemq connection factory -->
	<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<constructor-arg index="0" value="${JMS.BROKER.URL}" />
	</bean>

	<!-- ConnectionFactory Definition -->
	<bean id="connectionFactory"
		class="org.springframework.jms.connection.CachingConnectionFactory">
		<constructor-arg ref="amqConnectionFactory" />
	</bean>

	<!-- Destination Topic -->
	<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
		<constructor-arg index="0" value="${JMS.TOPIC.NAME}" />
	</bean>

	<!-- JmsTemplate Definition -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="defaultDestination" ref="destinationTopic" />
		<property name="pubSubDomain" value="true" />
	</bean>

	<!-- Message Publisher -->
	<bean id="messagePublisher" class="com.roytuts.spring.jms.publisher.MessagePublisher" />

	<!-- Message Subscriber1 -->
	<bean id="messageSubscriber1" class="com.roytuts.spring.jms.subscriber.MessageSubscriber1" />

	<!-- Message Subscriber2 -->
	<bean id="messageSubscriber2" class="com.roytuts.spring.jms.subscriber.MessageSubscriber2" />

	<!-- Message Subscriber1 Container -->
	<bean
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="destinationName" value="${JMS.TOPIC.NAME}" />
		<property name="messageListener" ref="messageSubscriber1" />
		<property name="pubSubDomain" value="true" />
	</bean>

	<!-- Message Subscriber2 Container -->
	<bean
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="destinationName" value="${JMS.TOPIC.NAME}" />
		<property name="messageListener" ref="messageSubscriber2" />
		<property name="pubSubDomain" value="true" />
	</bean>

</beans>

 6. Create an XML file that will load all other resources and configure support for annotation in the application. Put this file under src/main/resources/spring

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

	<context:annotation-config></context:annotation-config>

	<import resource="classpath:activemq-jms-spring-properties.xml" />
	<import resource="classpath:jms/activemq-jms-spring-jms.xml" />

</beans>

 7. Create a test class under src/test/java that will run the MessagePublisher.

package com.roytuts.spring.jms.publisher;

import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class MessagePublisherTest {

  private MessagePublisher messagePublisher;

  @SuppressWarnings("resource")
  @Before
  public void setUp() throws Exception {
    ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/activemq-jms-spring-context.xml");
    messagePublisher = (MessagePublisher) applicationContext.getBean("messagePublisher");
  }

  @Test
  public void testSendMessage() {
    messagePublisher.sendMessage("This is a message that will be posted into Topic.");
  }

}

8. Run the MessagePublisher.

Output

Message consumed by MessageSubscriber1 : This is a message that will be posted into Topic.
Message consumed by MessageSubscriber2 : This is a message that will be posted into Topic.

9. Now again click on Topics in ActiveMQ Web Console. You will not see any message in the IN_TOPIC because as soon as you ran the MessagePublisher the message has been consumed by MessageSubscriber1 and MessageSubscriber2 and now you will see one message was enqueued but two messages has been dequeued because there were two message subscribers. We do not need to run the MessageSubscriber1 or MessageSubscriber2 because it is an asynchronous messaging system and MessageSubscriber1 or MessageSubscriber2 is already registered to the Spring’s DefaultMessageListenerContainer. So when a message arrives to the Topic, the message gets automatically consumed by the subscriber’s onMessage() method.

ActiveMQ Spring JMS publish subscribe

Here is the pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.roytuts</groupId>
	<artifactId>apache-activemq</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>apache-activemq</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<jdk.version>1.8</jdk.version>
		<junit.version>4.11</junit.version>
		<slf4j.version>1.7.5</slf4j.version>
		<activemq.version>5.11.1</activemq.version>
		<spring.version>4.1.5.RELEASE</spring.version>
	</properties>

	<dependencies>
		<!-- activemq -->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
			<version>${activemq.version}</version>
		</dependency>

		<!-- Spring -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>${spring.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jms</artifactId>
			<version>${spring.version}</version>
		</dependency>

		<!-- junit -->
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>${junit.version}</version>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<configuration>
					<source>${jdk.version}</source>
					<target>${jdk.version}</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

 

That’s all. Thank you for reading.

Tags:

I am a professional Web developer, Enterprise Application developer, Software Engineer and Blogger. Connect me on Roy Tutorials | TwitterFacebook Google PlusLinkedin | Reddit | Email Me

Leave a Reply

Your email address will not be published. Required fields are marked *