Creating Durable Subscription in JMS

In this tutorial I am going to discuss how JMS API ensures reliable messaging by Creating Durable Subscriptions.

A Publish/Subscribe messaging domain is useless, if subscriber is not active while publisher is publishing a message to destination. If we create a durable subscriber instead of the non-durable subscriber, it is possible to ensure  reliable messaging. The non-durable subscriber example can be found in publish/subscribe domain examples Spring JMS and ActiveMQ Integration – publish/subscribe domain, JMS Client using JBoss 7 – Publish/Subscribe Messaging, and Configure JMS Client using GlassFish 3

For more on durable subscriber you can read JMS Concepts – Persistent and Durable

Please go through the tutorial Apache ActiveMQ Configuration in Windows before proceeding below.

Prerequisites

The following configurations are required in order to run the application

Eclipse Kepler
JDK 1.8
Have maven 3 installed and configured
Apache ActiveMQ dependency in pom.xml

Step 1. Create a standalone maven based web project in Eclipse

Step 2. Modify the pom.xml file as shown below.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.roytuts</groupId>
	<artifactId>jms</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>jms</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<java.version>1.8</java.version>
		<activemq.version>5.11.1</activemq.version>
	</properties>

	<dependencies>
		<!-- activemq -->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
			<version>${activemq.version}</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<configuration>
					<source>${java.version}</source>
					<target>${java.version}</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

Step 3. Create below jndi.properties file under src/main/resources

java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:61616
topic.topic/topicName=IN_TOPIC

Step 4. Create below publisher class which publishes message to a topic IN_TOPIC

package com.roytuts.jms.topic;

import javax.jms.JMSException;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class Publisher {

	public void publishMessage() {
		InitialContext initialContext = null;
		TopicConnectionFactory connectionFactory;
		TopicConnection connection = null;
		TopicPublisher publisher;
		TopicSession session;
		Topic topic;
		try {
			// Step 1. Create an initial context to perform the JNDI lookup.
			initialContext = new InitialContext();
			// Step 2. Look-up the JMS topic
			topic = (Topic) initialContext.lookup("topic/topicName");
			// Step 3. Look-up the JMS Topic connection factory
			connectionFactory = (TopicConnectionFactory) initialContext.lookup("ConnectionFactory");
			// Step 4. Create a JMS Topic connection
			connection = connectionFactory.createTopicConnection();
			// Step 5. Set the client-id on the connection
			connection.start();
			// step 6. Create Topic session
			session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
			// step 7. Create publisher
			publisher = session.createPublisher(topic);
			// publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
			// Step 8. Create a text message
			TextMessage message = session.createTextMessage("This sample message is consumed by subscriber");
			// Step 9. Publish the text message to the topic
			publisher.publish(message);
		} catch (JMSException | NamingException ex) {
			ex.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
			if (initialContext != null) {
				try {
					initialContext.close();
				} catch (NamingException e) {
					e.printStackTrace();
				}
			}
		}
	}

}

Step 5. Create below durable subscriber

package com.roytuts.jms.topic;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class DurableSubscriber {

	public void receiveMessage() {
		InitialContext initialContext = null;
		TopicConnectionFactory connectionFactory;
		TopicConnection connection = null;
		TopicSubscriber subscriber;
		TopicSession session = null;
		Topic topic;
		try {
			// Step 1. Create an initial context to perform the JNDI lookup.
			initialContext = new InitialContext();
			// Step 2. Look-up the JMS topic
			topic = (Topic) initialContext.lookup("topic/topicName");
			// Step 3. Look-up the JMS Topic connection factory
			connectionFactory = (TopicConnectionFactory) initialContext.lookup("ConnectionFactory");
			// Step 4. Create a JMS Topic connection
			connection = connectionFactory.createTopicConnection();
			// Step 5. Set the client-id on the connection
			// in case of non-durable subscriber, please remove the below line
			connection.setClientID("durable-client");
			// step 6. Start the connection
			connection.start();
			// step 7. Create Topic session
			session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
			// step 8. Create the durable subscriber
			// in case of non-durable subscriber, please use the below commented line
			//subscriber=session.createSubscriber(topic);
			subscriber = session.createDurableSubscriber(topic, "durableSubscriber");
			// Step 9. Consume the message
			Message message = subscriber.receive();
			if (message != null && message instanceof TextMessage) {
				TextMessage textMessage = (TextMessage) message;
				System.out.println(
						"DurableSubscriber received a message published by Publisher : " + textMessage.getText());
			} else if (message == null) {
				System.out.println(
						"DurableSubscriber fails to receive the message sent by the publisher due to a timeout.");
			} else {
				throw new JMSException("Message must be a type of TextMessage");
			}
		} catch (JMSException | NamingException ex) {
			ex.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
			if (initialContext != null) {
				try {
					initialContext.close();
				} catch (NamingException e) {
					e.printStackTrace();
				}
			}
		}
	}

}

Step 5. Create below classes for testing

package com.roytuts.jms.topic.test;

import com.roytuts.jms.topic.Publisher;

public class PublisherTest {

	public static void main(String[] args) {
		Publisher publisher = new Publisher();
		publisher.publishMessage();
	}

}
package com.roytuts.jms.topic.test;

import com.roytuts.jms.topic.DurableSubscriber;

public class DurableSubscriberTest {

	public static void main(String[] args) {
		DurableSubscriber durableSubscriber = new DurableSubscriber();
		durableSubscriber.receiveMessage();
	}

}

Step 6. While your ActiveMQ broker running, first run the class PublisherTest then DurableSubscriberTest. You will see the below output in console

DurableSubscriber received a message published by Publisher : This sample message is consumed by subscriber

Now to test on broker restart, do the following.

First run the PublisherTest class. Stop the broker. Again start the broker. Now run the DurableSubscriberTest class. You should see the same output as shown above in the console.

Thanks for reading.

Soumitra Roy Sarkar

I am a professional Web developer, Enterprise Application developer, Software Engineer and Blogger. Connect me on Roy Tutorials Twitter Facebook  Google Plus Linkedin

Leave a Reply

Your email address will not be published. Required fields are marked *