Following is the code that finally worked for me. It reads twit containing word Brexit and saves information in file.
So something more serious, may be later.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.SparkConf | |
import org.apache.spark.SparkConf | |
import org.apache.spark.SparkContext | |
import twitter4j.conf.ConfigurationContext | |
import twitter4j.auth.AuthorizationFactory | |
import org.apache.spark.streaming.twitter.TwitterUtils | |
import org.apache.spark.streaming.StreamingContext | |
import org.apache.spark.streaming.Seconds | |
import org.apache.spark.storage.StorageLevel | |
import twitter4j.auth.Authorization | |
object Main { | |
val CONSUMER_KEY = ""; | |
val CONSUMER_SECRET = ""; | |
val ACCESS_TOKEN_KEY = ""; | |
val ACCESS_TOKEN_SECRET = ""; | |
def main(args: Array[String]) { | |
System.setProperty("hadoop.home.dir","D:/sw/hadoop/hadoop-common-2.2.0-bin-master/"); | |
System.setProperty("spark.driver.allowMultipleContexts", "true"); | |
System.setProperty("twitter4j.oauth.consumerKey", CONSUMER_KEY); | |
System.setProperty("twitter4j.oauth.consumerSecret", CONSUMER_SECRET); | |
System.setProperty("twitter4j.oauth.accessToken", ACCESS_TOKEN_KEY); | |
System.setProperty("twitter4j.oauth.accessTokenSecret", ACCESS_TOKEN_SECRET); | |
val cfg = new SparkConf(); | |
cfg.setMaster("local[*]").setAppName("Spark Streaming Example in Scala"); | |
val ctx = new SparkContext(cfg); | |
val ssc = new StreamingContext(cfg,Seconds(30)); | |
val twitterConfig = ConfigurationContext.getInstance(); | |
val twitterAuth = AuthorizationFactory.getInstance(twitterConfig); | |
val filters = Seq("Brexit"); | |
val stream = TwitterUtils.createStream(ssc, None,filters,StorageLevel.MEMORY_AND_DISK_2).repartition(10); | |
val tagEntities = stream.flatMap { x => Seq(x.getUser,x.getText,x.getId)} | |
tagEntities.saveAsTextFiles("D:/keyur/tech/data/spark/twitter-stream/", "") | |
//stream.filter { x => x.getText.contains("China") }.map { x => (x.getId,x.getText) }.saveAsTextFiles("D:/keyur/tech/data/spark/twitter-stream/", "") | |
ssc.start() | |
ssc.awaitTermination() | |
ssc.stop(true) | |
if(ctx!=null) | |
ctx.stop() | |
} | |
} |