Jun 23, 2016

Spring Integration and ActiveMQ JMS message producer and consumer

Following what we want to achieve.



Following is configuration. Note that we need to set reply channel to nullChannel if we do not want to process reply coming from JMS queue after processing. If not set it will throw and Exception. So keeping that in mind, following will be the configuration.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd">
<!-- JMS Message Processor (Start) -->
<int:channel id="xmerChannel" />
<int:channel id="svcActChannel" />
<int-jms:inbound-gateway request-channel="xmerChannel" connection-factory="connectionFactory" request-destination="requestQueue" default-reply-destination="responseQueue" acknowledge="transacted"/>
<int:header-enricher input-channel="xmerChannel"
output-channel="svcActChannel">
<int:header name="lang" value="en" />
</int:header-enricher>
<int:service-activator id="printSvcActivator" ref="msgPrintBean"
method="printOutput" input-channel="svcActChannel"/>
<!-- JMS Message Producer (Start) -->
<int:channel id="reqChannel" />
<int:channel id="toJMSChannel" />
<int:gateway service-interface="com.techrefresh.spring.integration.gateway.JMSGateway" id="jmsGW" default-request-channel="reqChannel" />
<int:service-activator id="jmsMsgChecker" ref="msgPrintBean"
method="addTimeStampToMsg" input-channel="reqChannel" output-channel="toJMSChannel"/>
<int-jms:outbound-gateway request-channel="toJMSChannel" connection-factory="connectionFactory" request-destination="requestQueue" reply-channel="nullChannel"/>
<!-- JMS Message Producer (End) -->
<bean id="msgPrintBean" class="com.techrefresh.spring.integration.bean.UtilBean" />
<bean id="requestQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="request-queue" />
</bean>
<bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="response-queue" />
</bean>
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
</property>
<property name="sessionCacheSize" value="10" />
</bean>
<int:poller fixed-rate="1000" id="pollermain"></int:poller>
</beans>

We changed our configuration class little bit to print different messages
  1. Just before sending to activeMQ we append timestamp
  2. Once received on Queue we simply print it.

So following will be the util class.

package com.techrefresh.spring.integration.bean;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.techrefresh.spring.integration.pojo.TwitBean;
public class UtilBean {
private static final SimpleDateFormat frmt = new SimpleDateFormat("dd-MMM-yyyy hh:mm:ss:SSS");
private static final Logger logger = LoggerFactory.getLogger(UtilBean.class);
public void printOutput(TwitBean msg){
logger.info("Bean received from {} in {}::: {}",msg.getName(),msg.getLanguage(),msg.getMessage());
}
public String printOutput(String msg){
logger.info("Simple message {} will convert to uppercase",msg);
return msg.toUpperCase();
}
public String addTimeStampToMsg(String msg){
logger.info("Simple message {} will add TS {}",msg,frmt.format(new Date()));
return msg+" @ "+frmt.format(new Date());
}
}
view raw UtilBean.java hosted with ❤ by GitHub

Following is new interface just to use it for entry gateway.

package com.techrefresh.spring.integration.gateway;
public interface JMSGateway {
public void sendMessage(String msg);
}
view raw JMSGateway.java hosted with ❤ by GitHub
Following is screen when we run it from Eclipse
2016-06-23 09:32:39 INFO  UtilBean:22 - Simple message This is test message will add TS 23-Jun-2016 09:32:39:499
2016-06-23 09:32:39 INFO  UtilBean:18 - Simple message This is test message @ 23-Jun-2016 09:32:39:502 will convert to uppercase