Jun 28, 2016

Scala code for Twitter Stream reader with Apache Spark

Following is the code that finally worked for me. It reads twit containing word Brexit and saves information in file.

So something more serious, may be later.

Jun 26, 2016

Spring Batch with Spring integration

One way is explain in Spring Documents but if we want greater control and if customizations are required following another way.

In this blog we want to achive following.

At a high level

  • File inbound-channel-adapter will poll directory for any new file. If a new file comes in this folder then it will pass it to Transformer
  • Transformer will accept File as an input and Transform it to JobParameters (with file name as input-file parameter) and Pass it to Service Activator.
  • Service Activator will fetch the Job and JobLauncher object and execute the job and passes results to next end point
  • In this case next end point is again a Service activator which will simply print the status message. In real scenario this can be replaced by email or MQ or DB operation.
Following is Spring configuration
Follwoing is Batch job configuration
Following is Spring Integration configuration file.


Following will be output.

2016-06-27 00:14:02 INFO  FileReadingMessageSource:264 - Created message: [[Payload File content=D:\keyur\tech\data\batch\EQ180516.CSV][Headers={id=97667554-4582-7f7e-7659-18da2ba536a1, timestamp=1466957642760}]]
2016-06-27 00:14:02 INFO  JobParamXmer:13 - Will create parameters for file at D:\keyur\tech\data\batch\EQ180516.CSV
2016-06-27 00:14:02 INFO  JobExecutor:42 - Launcher org.springframework.batch.core.launch.support.SimpleJobLauncher@129d348, Job FlowJob: [name=titanicJob]
2016-06-27 00:14:02 INFO  JobExecutor:44 - Executing batch job {input.file=D:\keyur\tech\data\batch\EQ180516.CSV, output.file=file://D:/keyur/tech/data/titanic/out.csv}
2016-06-27 00:14:03 INFO  SimpleJobLauncher:133 - Job: [FlowJob: [name=titanicJob]] launched with the following parameters: [{input.file=D:\keyur\tech\data\batch\EQ180516.CSV, output.file=file://D:/keyur/tech/data/titanic/out.csv}]
2016-06-27 00:14:03 INFO  SimpleStepHandler:146 - Executing step: [flatfileread]
2016-06-27 00:14:03 WARN  FlatFileItemReader:253 - Input resource does not exist class path resource [D:/keyur/tech/data/batch/EQ180516.CSV]
2016-06-27 00:14:03 INFO  SimpleJobLauncher:136 - Job: [FlowJob: [name=titanicJob]] completed with the following parameters: [{input.file=D:\keyur\tech\data\batch\EQ180516.CSV, output.file=file://D:/keyur/tech/data/titanic/out.csv}] and the following status: [COMPLETED]
2016-06-27 00:14:03 INFO  Utility:10 - Logging message SUCCESS

Jun 23, 2016

Spring Integration and ActiveMQ JMS message producer and consumer

Following what we want to achieve.



Following is configuration. Note that we need to set reply channel to nullChannel if we do not want to process reply coming from JMS queue after processing. If not set it will throw and Exception. So keeping that in mind, following will be the configuration.


We changed our configuration class little bit to print different messages
  1. Just before sending to activeMQ we append timestamp
  2. Once received on Queue we simply print it.

So following will be the util class.


Following is new interface just to use it for entry gateway.

Following is screen when we run it from Eclipse
2016-06-23 09:32:39 INFO  UtilBean:22 - Simple message This is test message will add TS 23-Jun-2016 09:32:39:499
2016-06-23 09:32:39 INFO  UtilBean:18 - Simple message This is test message @ 23-Jun-2016 09:32:39:502 will convert to uppercase


Jun 22, 2016

ActiveMQ with Spring Integration inbound-gateway

In this example we want to read one message froom one queue (request-queue) put some header on it and send uppercase response to reply queue (response-queue). Following is Integration diagram.

Create two queues from ActiveMQ web console.
Following is configuration file



Folloiwng is util bean.


Now send message from activemq web console.


Following is what you receive on response channel.



Jun 21, 2016

Using Spring Integration example for Twitter stream Reading

Following is what we want to achieve.


Spring configuration file.



Main class for reading twits and passing it for processing. Note that information like consumer key, consumer secret, application secret key and application secret need to be copied from your twitter developer account.


Pojo class to store twit information.


Gateway interface


Utility Bean class.

Jun 13, 2016

Running first Apache Spark Project

Load data from CSV file to train model
Based on learning use trained model to predict the output.
Data used here is from https://www.kaggle.com/c/titanic

For running Spark from Eclipse it is required that we set following VM argument.
-Xmx512m


Following is source code.



Note that if Apache Hadoop is not installed on local machine then just download binaries and set system property hadoop.home.dir. If you are running stand alone code and do not want to hardcode set the proprety using -D option.

Some important tips are from Kaggle forums.