Jun 28, 2016

Scala code for Twitter Stream reader with Apache Spark

Following is the code that finally worked for me. It reads twit containing word Brexit and saves information in file.

import org.apache.spark.SparkConf
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import twitter4j.conf.ConfigurationContext
import twitter4j.auth.AuthorizationFactory
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel
import twitter4j.auth.Authorization
object Main {
val CONSUMER_KEY = "";
val CONSUMER_SECRET = "";
val ACCESS_TOKEN_KEY = "";
val ACCESS_TOKEN_SECRET = "";
def main(args: Array[String]) {
System.setProperty("hadoop.home.dir","D:/sw/hadoop/hadoop-common-2.2.0-bin-master/");
System.setProperty("spark.driver.allowMultipleContexts", "true");
System.setProperty("twitter4j.oauth.consumerKey", CONSUMER_KEY);
System.setProperty("twitter4j.oauth.consumerSecret", CONSUMER_SECRET);
System.setProperty("twitter4j.oauth.accessToken", ACCESS_TOKEN_KEY);
System.setProperty("twitter4j.oauth.accessTokenSecret", ACCESS_TOKEN_SECRET);
val cfg = new SparkConf();
cfg.setMaster("local[*]").setAppName("Spark Streaming Example in Scala");
val ctx = new SparkContext(cfg);
val ssc = new StreamingContext(cfg,Seconds(30));
val twitterConfig = ConfigurationContext.getInstance();
val twitterAuth = AuthorizationFactory.getInstance(twitterConfig);
val filters = Seq("Brexit");
val stream = TwitterUtils.createStream(ssc, None,filters,StorageLevel.MEMORY_AND_DISK_2).repartition(10);
val tagEntities = stream.flatMap { x => Seq(x.getUser,x.getText,x.getId)}
tagEntities.saveAsTextFiles("D:/keyur/tech/data/spark/twitter-stream/", "")
//stream.filter { x => x.getText.contains("China") }.map { x => (x.getId,x.getText) }.saveAsTextFiles("D:/keyur/tech/data/spark/twitter-stream/", "")
ssc.start()
ssc.awaitTermination()
ssc.stop(true)
if(ctx!=null)
ctx.stop()
}
}
view raw Main.scala hosted with ❤ by GitHub
So something more serious, may be later.

Jun 26, 2016

Spring Batch with Spring integration

One way is explain in Spring Documents but if we want greater control and if customizations are required following another way.

In this blog we want to achive following.

At a high level

  • File inbound-channel-adapter will poll directory for any new file. If a new file comes in this folder then it will pass it to Transformer
  • Transformer will accept File as an input and Transform it to JobParameters (with file name as input-file parameter) and Pass it to Service Activator.
  • Service Activator will fetch the Job and JobLauncher object and execute the job and passes results to next end point
  • In this case next end point is again a Service activator which will simply print the status message. In real scenario this can be replaced by email or MQ or DB operation.
Following is Spring configuration
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/jdbc
http://www.springframework.org/schema/jdbc/spring-jdbc.xsd">
<import resource="bhavcopy-job.xml" />
<import resource="spring-int-context.xml"/>
<bean id="dataSource"
class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="com.mysql.cj.jdbc.Driver" />
<property name="url" value="jdbc:mysql://localhost:3306/techrefresh?useSSL=false" />
<property name="username" value="root" />
<property name="password" value="" />
</bean>
<bean id="transactionManager"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<jdbc:initialize-database data-source="dataSource">
<jdbc:script location="org/springframework/batch/core/schema-drop-mysql.sql" />
<jdbc:script location="org/springframework/batch/core/schema-mysql.sql" />
</jdbc:initialize-database>
</beans>
Follwoing is Batch job configuration
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
">
<batch:job-repository />
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
<batch:job id="titanicJob">
<batch:step id="flatfileread">
<batch:tasklet>
<batch:chunk reader="itemReader" writer="itemWriter"
commit-interval="5">
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
scope="step">
<property name="resource" value="#{jobParameters['input.file']}" />
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean
class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="names"
value="SC_CODE,SC_NAME,SC_GROUP,SC_TYPE,OPEN,HIGH,LOW,CLOSE,LAST,PREVCLOSE,NO_TRADES,NO_OF_SHRS,NET_TURNOV,TDCLOINDI" />
</bean>
</property>
<property name="fieldSetMapper">
<bean class="com.techrefresh.spring.batch.mapper.ScriptPriceFieldMapper"/>
</property>
</bean>
</property>
<property name="strict" value="false" />
</bean>
<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter"
scope="step">
<property name="resource" value="#{jobParameters['output.file']}" />
<property name="lineAggregator">
<bean
class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="delimiter" value="," />
<property name="fieldExtractor">
<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
<property name="names" value="scripCode,scripName,scripGroup,scripType" />
</bean>
</property>
</bean>
</property>
</bean>
</beans>
Following is Spring Integration configuration file.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
<int:channel id="msgPrintChannel" />
<int:channel id="batchJobChanel" />
<int:channel id="xmer" />
<int-file:inbound-channel-adapter directory="D:/keyur/tech/data/batch" channel="xmer" />
<int:transformer input-channel="xmer" output-channel="batchJobChanel" ref="xmerBean" method="buildJobParams"/>
<int:service-activator input-channel="batchJobChanel" ref="jobExecutor" method="executeJob" output-channel="msgPrintChannel"/>
<int:service-activator input-channel="msgPrintChannel" ref="util" method="printMsg" requires-reply="false"/>
<bean id="xmerBean" class="com.techrefresh.spring.batch.integration.xmer.JobParamXmer" />
<bean id="jobExecutor" class="com.techrefresh.spring.batch.JobExecutor">
<property name="jobLauncher" ref="jobLauncher"/>
<property name="titanicJob" ref="titanicJob"/>
</bean>
<bean id="util" class="com.techrefresh.spring.batch.util.Utility" />
<int:poller fixed-rate="100" id="defaultPoller" default="true" />
</beans>


Following will be output.

2016-06-27 00:14:02 INFO  FileReadingMessageSource:264 - Created message: [[Payload File content=D:\keyur\tech\data\batch\EQ180516.CSV][Headers={id=97667554-4582-7f7e-7659-18da2ba536a1, timestamp=1466957642760}]]
2016-06-27 00:14:02 INFO  JobParamXmer:13 - Will create parameters for file at D:\keyur\tech\data\batch\EQ180516.CSV
2016-06-27 00:14:02 INFO  JobExecutor:42 - Launcher org.springframework.batch.core.launch.support.SimpleJobLauncher@129d348, Job FlowJob: [name=titanicJob]
2016-06-27 00:14:02 INFO  JobExecutor:44 - Executing batch job {input.file=D:\keyur\tech\data\batch\EQ180516.CSV, output.file=file://D:/keyur/tech/data/titanic/out.csv}
2016-06-27 00:14:03 INFO  SimpleJobLauncher:133 - Job: [FlowJob: [name=titanicJob]] launched with the following parameters: [{input.file=D:\keyur\tech\data\batch\EQ180516.CSV, output.file=file://D:/keyur/tech/data/titanic/out.csv}]
2016-06-27 00:14:03 INFO  SimpleStepHandler:146 - Executing step: [flatfileread]
2016-06-27 00:14:03 WARN  FlatFileItemReader:253 - Input resource does not exist class path resource [D:/keyur/tech/data/batch/EQ180516.CSV]
2016-06-27 00:14:03 INFO  SimpleJobLauncher:136 - Job: [FlowJob: [name=titanicJob]] completed with the following parameters: [{input.file=D:\keyur\tech\data\batch\EQ180516.CSV, output.file=file://D:/keyur/tech/data/titanic/out.csv}] and the following status: [COMPLETED]
2016-06-27 00:14:03 INFO  Utility:10 - Logging message SUCCESS

Jun 23, 2016

Spring Integration and ActiveMQ JMS message producer and consumer

Following what we want to achieve.



Following is configuration. Note that we need to set reply channel to nullChannel if we do not want to process reply coming from JMS queue after processing. If not set it will throw and Exception. So keeping that in mind, following will be the configuration.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd">
<!-- JMS Message Processor (Start) -->
<int:channel id="xmerChannel" />
<int:channel id="svcActChannel" />
<int-jms:inbound-gateway request-channel="xmerChannel" connection-factory="connectionFactory" request-destination="requestQueue" default-reply-destination="responseQueue" acknowledge="transacted"/>
<int:header-enricher input-channel="xmerChannel"
output-channel="svcActChannel">
<int:header name="lang" value="en" />
</int:header-enricher>
<int:service-activator id="printSvcActivator" ref="msgPrintBean"
method="printOutput" input-channel="svcActChannel"/>
<!-- JMS Message Producer (Start) -->
<int:channel id="reqChannel" />
<int:channel id="toJMSChannel" />
<int:gateway service-interface="com.techrefresh.spring.integration.gateway.JMSGateway" id="jmsGW" default-request-channel="reqChannel" />
<int:service-activator id="jmsMsgChecker" ref="msgPrintBean"
method="addTimeStampToMsg" input-channel="reqChannel" output-channel="toJMSChannel"/>
<int-jms:outbound-gateway request-channel="toJMSChannel" connection-factory="connectionFactory" request-destination="requestQueue" reply-channel="nullChannel"/>
<!-- JMS Message Producer (End) -->
<bean id="msgPrintBean" class="com.techrefresh.spring.integration.bean.UtilBean" />
<bean id="requestQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="request-queue" />
</bean>
<bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="response-queue" />
</bean>
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
</property>
<property name="sessionCacheSize" value="10" />
</bean>
<int:poller fixed-rate="1000" id="pollermain"></int:poller>
</beans>

We changed our configuration class little bit to print different messages
  1. Just before sending to activeMQ we append timestamp
  2. Once received on Queue we simply print it.

So following will be the util class.

package com.techrefresh.spring.integration.bean;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.techrefresh.spring.integration.pojo.TwitBean;
public class UtilBean {
private static final SimpleDateFormat frmt = new SimpleDateFormat("dd-MMM-yyyy hh:mm:ss:SSS");
private static final Logger logger = LoggerFactory.getLogger(UtilBean.class);
public void printOutput(TwitBean msg){
logger.info("Bean received from {} in {}::: {}",msg.getName(),msg.getLanguage(),msg.getMessage());
}
public String printOutput(String msg){
logger.info("Simple message {} will convert to uppercase",msg);
return msg.toUpperCase();
}
public String addTimeStampToMsg(String msg){
logger.info("Simple message {} will add TS {}",msg,frmt.format(new Date()));
return msg+" @ "+frmt.format(new Date());
}
}
view raw UtilBean.java hosted with ❤ by GitHub

Following is new interface just to use it for entry gateway.

package com.techrefresh.spring.integration.gateway;
public interface JMSGateway {
public void sendMessage(String msg);
}
view raw JMSGateway.java hosted with ❤ by GitHub
Following is screen when we run it from Eclipse
2016-06-23 09:32:39 INFO  UtilBean:22 - Simple message This is test message will add TS 23-Jun-2016 09:32:39:499
2016-06-23 09:32:39 INFO  UtilBean:18 - Simple message This is test message @ 23-Jun-2016 09:32:39:502 will convert to uppercase


Jun 22, 2016

ActiveMQ with Spring Integration inbound-gateway

In this example we want to read one message froom one queue (request-queue) put some header on it and send uppercase response to reply queue (response-queue). Following is Integration diagram.

Create two queues from ActiveMQ web console.
Following is configuration file

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd">
<!-- Loan Request comes from MQ message (temporary direct message) -->
<int:channel id="xmerChannel" />
<int:channel id="svcActChannel" />
<int-jms:inbound-gateway request-channel="xmerChannel" connection-factory="connectionFactory" request-destination="requestQueue" default-reply-destination="responseQueue" acknowledge="transacted"/>
<int:header-enricher input-channel="xmerChannel"
output-channel="svcActChannel">
<int:header name="lang" value="en" />
</int:header-enricher>
<int:service-activator id="printSvcActivator" ref="msgPrintBean"
method="printOutput" input-channel="svcActChannel"/>
<bean id="msgPrintBean" class="com.techrefresh.spring.integration.bean.UtilBean" />
<bean id="requestQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="request-queue" />
</bean>
<bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="response-queue" />
</bean>
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
</property>
<property name="sessionCacheSize" value="10" />
</bean>
<int:poller fixed-rate="1000" id="pollermain"></int:poller>
</beans>


Folloiwng is util bean.

package com.techrefresh.spring.integration.bean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UtilBean {
private static final Logger logger = LoggerFactory.getLogger(UtilBean.class);
public String printOutput(String msg){
logger.info("Simple message {}",msg);
return msg.toUpperCase();
}
}
view raw UtilBean.java hosted with ❤ by GitHub

Now send message from activemq web console.


Following is what you receive on response channel.



Jun 21, 2016

Using Spring Integration example for Twitter stream Reading

Following is what we want to achieve.


Spring configuration file.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd">
<!-- Loan Request comes from MQ message (temporary direct message) -->
<int:channel id="xmerChannel" />
<int:channel id="langRouterChannel" />
<int:channel id="engChannel" />
<int:channel id="espChannel" />
<int:channel id="frcChannel" />
<int:gateway id="entryPoint" default-request-channel="xmerChannel"
service-interface="com.techrefresh.spring.integration.gateway.TwitGateway" />
<int:header-enricher input-channel="xmerChannel" output-channel="langRouterChannel">
<int:header name="name" expression="payload.name"/>
<int:header name="lang" expression="payload.language"/>
</int:header-enricher>
<int:header-value-router header-name="lang" input-channel="langRouterChannel">
<int:mapping value="en" channel="engChannel"/>
<int:mapping value="es" channel="espChannel"/>
<int:mapping value="fr" channel="frcChannel"/>
</int:header-value-router>
<int:service-activator id="enOutSvc" ref="msgPrintBean" method="printOutput" input-channel="engChannel" requires-reply="false" />
<int:service-activator id="esOutSvc" ref="msgPrintBean" method="printOutput" input-channel="espChannel" requires-reply="false" />
<int:service-activator id="frOutSvc" ref="msgPrintBean" method="printOutput" input-channel="frcChannel" requires-reply="false" />
<bean id="msgPrintBean"
class="com.techrefresh.spring.integration.bean.UtilBean" />
</beans>


Main class for reading twits and passing it for processing. Note that information like consumer key, consumer secret, application secret key and application secret need to be copied from your twitter developer account.

package com.techrefresh.spring.integration.twitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
import com.techrefresh.spring.integration.gateway.TwitGateway;
import com.techrefresh.spring.integration.pojo.TwitBean;
public class StreamReader {
private static Logger logger = LoggerFactory.getLogger(StreamReader.class);
private static TwitterStream stream = null;
public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext("spring-int-context.xml");
final TwitGateway gw = (TwitGateway) ctx.getBean("entryPoint");
init();
StatusListener listener = new StatusListener() {
public void onException(Exception arg0) {
}
public void onDeletionNotice(StatusDeletionNotice arg0) {
}
public void onScrubGeo(long arg0, long arg1) {
}
public void onStallWarning(StallWarning arg0) {
}
public void onStatus(Status status) {
TwitBean bean = new TwitBean();
bean.setName(status.getUser().getName());
bean.setMessage(status.getText());
bean.setLanguage(status.getLang());
gw.parseTwit(bean);
}
public void onTrackLimitationNotice(int arg0) {
}
};
FilterQuery qry = new FilterQuery();
String[] keywords = { "Hong Kong"};
qry.track(keywords);
stream.addListener(listener);
stream.filter(qry);
}
private static void init(){
logger.info("Initializing Twitter Stream");
ConfigurationBuilder confBuilder = new ConfigurationBuilder();
confBuilder.setOAuthConsumerKey(TwitterConstants.CONSUMER_KEY);
confBuilder.setOAuthConsumerSecret(TwitterConstants.CONSUMER_SECRET);
confBuilder.setOAuthAccessToken(TwitterConstants.ACCESS_TOKEN_KEY);
confBuilder.setOAuthAccessTokenSecret(TwitterConstants.ACCESS_TOKEN_SECRET);
stream = new TwitterStreamFactory(confBuilder.build()).getInstance();
}
}

Pojo class to store twit information.

package com.techrefresh.spring.integration.pojo;
public class TwitBean {
private String name;
private String message;
private String language;
public String getLanguage() {
return language;
}
public void setLanguage(String language) {
this.language = language;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
view raw TwitBean.java hosted with ❤ by GitHub

Gateway interface

package com.techrefresh.spring.integration.gateway;
import com.techrefresh.spring.integration.pojo.TwitBean;
public interface TwitGateway {
public void parseTwit(TwitBean bean);
}

Utility Bean class.

package com.techrefresh.spring.integration.bean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.techrefresh.spring.integration.pojo.TwitBean;
public class UtilBean {
private static final Logger logger = LoggerFactory.getLogger(UtilBean.class);
public void printOutput(TwitBean msg){
logger.info("Bean received from {} in {}::: {}",msg.getName(),msg.getLanguage(),msg.getMessage());
}
}
view raw UtilBean.java hosted with ❤ by GitHub

Jun 13, 2016

Running first Apache Spark Project

Load data from CSV file to train model
Based on learning use trained model to predict the output.
Data used here is from https://www.kaggle.com/c/titanic

For running Spark from Eclipse it is required that we set following VM argument.
-Xmx512m


Following is source code.
package com.spark.titanic.main;
import java.util.HashMap;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.RandomForest;
import org.apache.spark.mllib.tree.model.RandomForestModel;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import com.spark.titanic.bean.PessangerBean;
import com.spark.titanic.bean.PessangerBean.Embark;
import com.spark.titanic.bean.PessangerBean.Sex;
public class TitanicMain {
private static final Logger logger = LoggerFactory
.getLogger(TitanicMain.class);
private static JavaSparkContext sc;
private static SQLContext sqlCtx;
public static void main(String[] args) {
System.setProperty("hadoop.home.dir",
"D:/sw/hadoop/hadoop-common-2.2.0-bin-master/");
// Define a configuration to use to interact with Spark
SparkConf conf = new SparkConf().setMaster("local").setAppName(
"Titanic Survival Analytics App");
// Create a Java version of the Spark Context from the configuration
sc = new JavaSparkContext(conf);
loadTrainingData();
}
private static void loadTrainingData() {
JavaRDD<LabeledPoint> trainingRDD = sc
.textFile("file://D:/tech/data/titanic/train.csv")
.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String line) throws Exception {
return line.charAt(0) != 'P';
}
}).map(new Function<String, LabeledPoint>() {
@Override
public LabeledPoint call(String line) throws Exception {
String[] fields = line.split(
",(?=([^\"]*\"[^\"]*\")*[^\"]*$)", -1);
// logger.info("{} {} {}",fields[1],fields[2],fields[5]);
LabeledPoint point = new LabeledPoint(Double
.valueOf(fields[1]), Vectors.dense(Double
.valueOf(fields[2]), Double
.valueOf(fields[5] == null
|| "".equals(fields[5].trim()) ? "0"
: fields[5]),
Double.valueOf(fields[6]), Double
.valueOf(fields[7]), Double
.valueOf(fields[9]), "male"
.equalsIgnoreCase(fields[4]) ? 1d : 0d));
return point;
}
});
Integer numClasses = 2;
HashMap<Integer, Integer> categoricalFeaturesInfo = new HashMap<Integer, Integer>();
Integer numTrees = 3; // Use more in practice.
String featureSubsetStrategy = "auto"; // Let the algorithm choose.
String impurity = "gini";
Integer maxDepth = 5;
Integer maxBins = 32;
Integer seed = 12345;
double split[] = { 0.7, 0.3 };
logger.info("No of splits {}", split.length);
JavaRDD<LabeledPoint> splits[] = trainingRDD.randomSplit(split);
logger.info("Count in splits is {} {}", splits[0].count(),
splits[1].count());
final RandomForestModel model = RandomForest.trainClassifier(
trainingRDD, numClasses, categoricalFeaturesInfo, numTrees,
featureSubsetStrategy, impurity, maxDepth, maxBins, seed);
JavaRDD<LabeledPoint> testData = sc
.textFile("file://D:/tech/data/titanic/test.csv")
.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String line) throws Exception {
return line.charAt(0) != 'P';
}
}).map(new Function<String, LabeledPoint>() {
@Override
public LabeledPoint call(String line) throws Exception {
String[] fields = line.split(
",(?=([^\"]*\"[^\"]*\")*[^\"]*$)", -1);
// logger.info("{} {} {}",fields[1],fields[2],fields[5]);
LabeledPoint point = new LabeledPoint(Double
.valueOf(fields[0]), Vectors.dense(Double
.valueOf(fields[1]), // pClass
Double.valueOf(fields[4] == null
|| "".equals(fields[5].trim()) ? "0"
: fields[5]),// Age
Double.valueOf(fields[5]),// SibSp
Double.valueOf(fields[6]),// pArch
Double.valueOf(fields[8] == null
|| "".equals(fields[8].trim()) ? "0"
: fields[8]),// Fare
"male".equalsIgnoreCase(fields[4]) ? 1d : 0d));
return point;
}
});
JavaRDD<Tuple2<Double, Double>> testDataPred = testData
.map(new Function<LabeledPoint, Tuple2<Double, Double>>() {
@Override
public Tuple2<Double, Double> call(LabeledPoint point)
throws Exception {
Tuple2<Double, Double> tuple = new Tuple2<Double, Double>(
point.label(), model.predict(point.features()));
return tuple;
}
});
testDataPred.foreach(new VoidFunction<Tuple2<Double, Double>>() {
@Override
public void call(Tuple2<Double, Double> arg0) throws Exception {
logger.info("{}\t{}", arg0._1, arg0._2);
}
});
}
}



Note that if Apache Hadoop is not installed on local machine then just download binaries and set system property hadoop.home.dir. If you are running stand alone code and do not want to hardcode set the proprety using -D option.

Some important tips are from Kaggle forums.