Jun 26, 2016

Spring Batch with Spring integration

One way is explain in Spring Documents but if we want greater control and if customizations are required following another way.

In this blog we want to achive following.

At a high level

  • File inbound-channel-adapter will poll directory for any new file. If a new file comes in this folder then it will pass it to Transformer
  • Transformer will accept File as an input and Transform it to JobParameters (with file name as input-file parameter) and Pass it to Service Activator.
  • Service Activator will fetch the Job and JobLauncher object and execute the job and passes results to next end point
  • In this case next end point is again a Service activator which will simply print the status message. In real scenario this can be replaced by email or MQ or DB operation.
Following is Spring configuration
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/jdbc
http://www.springframework.org/schema/jdbc/spring-jdbc.xsd">
<import resource="bhavcopy-job.xml" />
<import resource="spring-int-context.xml"/>
<bean id="dataSource"
class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="com.mysql.cj.jdbc.Driver" />
<property name="url" value="jdbc:mysql://localhost:3306/techrefresh?useSSL=false" />
<property name="username" value="root" />
<property name="password" value="" />
</bean>
<bean id="transactionManager"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<jdbc:initialize-database data-source="dataSource">
<jdbc:script location="org/springframework/batch/core/schema-drop-mysql.sql" />
<jdbc:script location="org/springframework/batch/core/schema-mysql.sql" />
</jdbc:initialize-database>
</beans>
Follwoing is Batch job configuration
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
">
<batch:job-repository />
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
<batch:job id="titanicJob">
<batch:step id="flatfileread">
<batch:tasklet>
<batch:chunk reader="itemReader" writer="itemWriter"
commit-interval="5">
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
scope="step">
<property name="resource" value="#{jobParameters['input.file']}" />
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean
class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="names"
value="SC_CODE,SC_NAME,SC_GROUP,SC_TYPE,OPEN,HIGH,LOW,CLOSE,LAST,PREVCLOSE,NO_TRADES,NO_OF_SHRS,NET_TURNOV,TDCLOINDI" />
</bean>
</property>
<property name="fieldSetMapper">
<bean class="com.techrefresh.spring.batch.mapper.ScriptPriceFieldMapper"/>
</property>
</bean>
</property>
<property name="strict" value="false" />
</bean>
<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter"
scope="step">
<property name="resource" value="#{jobParameters['output.file']}" />
<property name="lineAggregator">
<bean
class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="delimiter" value="," />
<property name="fieldExtractor">
<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
<property name="names" value="scripCode,scripName,scripGroup,scripType" />
</bean>
</property>
</bean>
</property>
</bean>
</beans>
Following is Spring Integration configuration file.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
<int:channel id="msgPrintChannel" />
<int:channel id="batchJobChanel" />
<int:channel id="xmer" />
<int-file:inbound-channel-adapter directory="D:/keyur/tech/data/batch" channel="xmer" />
<int:transformer input-channel="xmer" output-channel="batchJobChanel" ref="xmerBean" method="buildJobParams"/>
<int:service-activator input-channel="batchJobChanel" ref="jobExecutor" method="executeJob" output-channel="msgPrintChannel"/>
<int:service-activator input-channel="msgPrintChannel" ref="util" method="printMsg" requires-reply="false"/>
<bean id="xmerBean" class="com.techrefresh.spring.batch.integration.xmer.JobParamXmer" />
<bean id="jobExecutor" class="com.techrefresh.spring.batch.JobExecutor">
<property name="jobLauncher" ref="jobLauncher"/>
<property name="titanicJob" ref="titanicJob"/>
</bean>
<bean id="util" class="com.techrefresh.spring.batch.util.Utility" />
<int:poller fixed-rate="100" id="defaultPoller" default="true" />
</beans>


Following will be output.

2016-06-27 00:14:02 INFO  FileReadingMessageSource:264 - Created message: [[Payload File content=D:\keyur\tech\data\batch\EQ180516.CSV][Headers={id=97667554-4582-7f7e-7659-18da2ba536a1, timestamp=1466957642760}]]
2016-06-27 00:14:02 INFO  JobParamXmer:13 - Will create parameters for file at D:\keyur\tech\data\batch\EQ180516.CSV
2016-06-27 00:14:02 INFO  JobExecutor:42 - Launcher org.springframework.batch.core.launch.support.SimpleJobLauncher@129d348, Job FlowJob: [name=titanicJob]
2016-06-27 00:14:02 INFO  JobExecutor:44 - Executing batch job {input.file=D:\keyur\tech\data\batch\EQ180516.CSV, output.file=file://D:/keyur/tech/data/titanic/out.csv}
2016-06-27 00:14:03 INFO  SimpleJobLauncher:133 - Job: [FlowJob: [name=titanicJob]] launched with the following parameters: [{input.file=D:\keyur\tech\data\batch\EQ180516.CSV, output.file=file://D:/keyur/tech/data/titanic/out.csv}]
2016-06-27 00:14:03 INFO  SimpleStepHandler:146 - Executing step: [flatfileread]
2016-06-27 00:14:03 WARN  FlatFileItemReader:253 - Input resource does not exist class path resource [D:/keyur/tech/data/batch/EQ180516.CSV]
2016-06-27 00:14:03 INFO  SimpleJobLauncher:136 - Job: [FlowJob: [name=titanicJob]] completed with the following parameters: [{input.file=D:\keyur\tech\data\batch\EQ180516.CSV, output.file=file://D:/keyur/tech/data/titanic/out.csv}] and the following status: [COMPLETED]
2016-06-27 00:14:03 INFO  Utility:10 - Logging message SUCCESS