Jun 21, 2016

Using Spring Integration example for Twitter stream Reading

Following is what we want to achieve.


Spring configuration file.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd">
<!-- Loan Request comes from MQ message (temporary direct message) -->
<int:channel id="xmerChannel" />
<int:channel id="langRouterChannel" />
<int:channel id="engChannel" />
<int:channel id="espChannel" />
<int:channel id="frcChannel" />
<int:gateway id="entryPoint" default-request-channel="xmerChannel"
service-interface="com.techrefresh.spring.integration.gateway.TwitGateway" />
<int:header-enricher input-channel="xmerChannel" output-channel="langRouterChannel">
<int:header name="name" expression="payload.name"/>
<int:header name="lang" expression="payload.language"/>
</int:header-enricher>
<int:header-value-router header-name="lang" input-channel="langRouterChannel">
<int:mapping value="en" channel="engChannel"/>
<int:mapping value="es" channel="espChannel"/>
<int:mapping value="fr" channel="frcChannel"/>
</int:header-value-router>
<int:service-activator id="enOutSvc" ref="msgPrintBean" method="printOutput" input-channel="engChannel" requires-reply="false" />
<int:service-activator id="esOutSvc" ref="msgPrintBean" method="printOutput" input-channel="espChannel" requires-reply="false" />
<int:service-activator id="frOutSvc" ref="msgPrintBean" method="printOutput" input-channel="frcChannel" requires-reply="false" />
<bean id="msgPrintBean"
class="com.techrefresh.spring.integration.bean.UtilBean" />
</beans>


Main class for reading twits and passing it for processing. Note that information like consumer key, consumer secret, application secret key and application secret need to be copied from your twitter developer account.

package com.techrefresh.spring.integration.twitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
import com.techrefresh.spring.integration.gateway.TwitGateway;
import com.techrefresh.spring.integration.pojo.TwitBean;
public class StreamReader {
private static Logger logger = LoggerFactory.getLogger(StreamReader.class);
private static TwitterStream stream = null;
public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext("spring-int-context.xml");
final TwitGateway gw = (TwitGateway) ctx.getBean("entryPoint");
init();
StatusListener listener = new StatusListener() {
public void onException(Exception arg0) {
}
public void onDeletionNotice(StatusDeletionNotice arg0) {
}
public void onScrubGeo(long arg0, long arg1) {
}
public void onStallWarning(StallWarning arg0) {
}
public void onStatus(Status status) {
TwitBean bean = new TwitBean();
bean.setName(status.getUser().getName());
bean.setMessage(status.getText());
bean.setLanguage(status.getLang());
gw.parseTwit(bean);
}
public void onTrackLimitationNotice(int arg0) {
}
};
FilterQuery qry = new FilterQuery();
String[] keywords = { "Hong Kong"};
qry.track(keywords);
stream.addListener(listener);
stream.filter(qry);
}
private static void init(){
logger.info("Initializing Twitter Stream");
ConfigurationBuilder confBuilder = new ConfigurationBuilder();
confBuilder.setOAuthConsumerKey(TwitterConstants.CONSUMER_KEY);
confBuilder.setOAuthConsumerSecret(TwitterConstants.CONSUMER_SECRET);
confBuilder.setOAuthAccessToken(TwitterConstants.ACCESS_TOKEN_KEY);
confBuilder.setOAuthAccessTokenSecret(TwitterConstants.ACCESS_TOKEN_SECRET);
stream = new TwitterStreamFactory(confBuilder.build()).getInstance();
}
}

Pojo class to store twit information.

package com.techrefresh.spring.integration.pojo;
public class TwitBean {
private String name;
private String message;
private String language;
public String getLanguage() {
return language;
}
public void setLanguage(String language) {
this.language = language;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
view raw TwitBean.java hosted with ❤ by GitHub

Gateway interface

package com.techrefresh.spring.integration.gateway;
import com.techrefresh.spring.integration.pojo.TwitBean;
public interface TwitGateway {
public void parseTwit(TwitBean bean);
}

Utility Bean class.

package com.techrefresh.spring.integration.bean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.techrefresh.spring.integration.pojo.TwitBean;
public class UtilBean {
private static final Logger logger = LoggerFactory.getLogger(UtilBean.class);
public void printOutput(TwitBean msg){
logger.info("Bean received from {} in {}::: {}",msg.getName(),msg.getLanguage(),msg.getMessage());
}
}
view raw UtilBean.java hosted with ❤ by GitHub