Feb 24, 2014

Producer-Consumer example with Twitter Stream reading

While looking at examples on Producer Consumer using Java I was not able to find much of practical examples so let’s take some real time example. As explained in last blog we can read stream provided by Twitter using Twitter4J api which sends us real time Twitter data. So now lets use this feed as producer and we design a consumer that will check count of twits that we are getting for particular key word.

So first we will change our StreamReaderService to accept a queue and list of keywords as follows.

package com.techcielo.twitreader.service;

import java.util.concurrent.BlockingQueue;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;

import com.techcielo.twitreader.bean.TwitterStreamBean;
import com.techcielo.twitreader.util.TwitterStreamBuilderUtil;

public class StreamReaderService implements Runnable
{
       private BlockingQueue<TwitterStreamBean> beanQ;
       private String[] keywords;
       public StreamReaderService(BlockingQueue<TwitterStreamBean> beanQ,String[] keywords) {
              this.beanQ=beanQ;
              this.keywords=keywords;
       }
      
       public void readTwitterFeed() {

              TwitterStream stream = TwitterStreamBuilderUtil.getStream();

              StatusListener listener = new StatusListener() {

                     @Override
                     public void onException(Exception e) {
                           System.out.println("Exception occured:" + e.getMessage());
                     }

                     @Override
                     public void onTrackLimitationNotice(int n) {
                           System.out.println("Track limitation notice for " + n);
                     }

                     @Override
                     public void onStatus(Status status) {
                           System.out.println("Got twit:" + status.getText());
                           TwitterStreamBean bean = new TwitterStreamBean();
                           String username = status.getUser().getScreenName();
                           bean.setUsername(username);
                           long tweetId = status.getId();
                           bean.setId(tweetId);
                           bean.setInReplyUserName(status.getInReplyToScreenName());
                           if (status != null && status.getRetweetedStatus() != null
                                         && status.getRetweetedStatus().getUser() != null) {
                                  bean.setRetwitUserName(status.getRetweetedStatus()
                                                .getUser().getScreenName());
                           }
                           String content = status.getText();
                           bean.setContent(content);
                           try {
                                  beanQ.put(bean);
                           } catch (InterruptedException e) {
                                  System.out.println("Error occured while pusing feed data");
                           }
                     }

                     @Override
                     public void onStallWarning(StallWarning arg0) {
                           System.out.println("Stall warning");
                     }

                     @Override
                     public void onScrubGeo(long arg0, long arg1) {
                           System.out.println("Scrub geo with:" + arg0 + ":" + arg1);
                     }

                     @Override
                     public void onDeletionNotice(StatusDeletionNotice arg0) {
                           System.out.println("Status deletion notice");
                     }
              };
              FilterQuery qry = new FilterQuery();
              qry.track(keywords);

              stream.addListener(listener);
              stream.filter(qry);
       }

       @Override
       public void run() {
              readTwitterFeed();
       }
}


Now we will create a consumer that will accept keywords as constructor argument and when twit comes to this queue we will check for index of keyword in it and display count for each keyword. So following will be our consumer class.

package com.techcielo.twitreader.service;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import com.techcielo.twitreader.bean.TwitterStreamBean;

public class StreamConsumerService implements Runnable{
      
       private BlockingQueue<TwitterStreamBean> beanQ;
       private String[] keywords;
       private Map<String, Integer> countmap=new HashMap<>();
       public StreamConsumerService(BlockingQueue<TwitterStreamBean> beanQ, String[] keywords) {
              this.beanQ=beanQ;
              this.keywords=keywords;
       }
       @Override
       public void run() {
              System.out.println("Started twit parser");
              while(true){
                     try {
                           TwitterStreamBean bean = beanQ.poll(1, TimeUnit.MILLISECONDS);
                           if(bean!=null){
                                  for(int i=0;i<keywords.length;i++){
                                         if(bean.getContent().toUpperCase().indexOf(keywords[i].toUpperCase())!=-1){
                                                if(countmap.containsKey(keywords[i].toUpperCase())){
                                                       countmap.put(keywords[i].toUpperCase(), countmap.get(keywords[i].toUpperCase())+1);
                                                }
                                                else{
                                                       countmap.put(keywords[i].toUpperCase(), 1);
                                                }
                                         }
                                  }
                                  System.out.println("Count:"+countmap);
                           }
                     } catch (InterruptedException e) {
                     }
              }
       }
}

Now we will create a class to start producer and consumer service as follows.

package com.techcielo.twitreader.main;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import com.techcielo.twitreader.bean.TwitterStreamBean;
import com.techcielo.twitreader.service.StreamConsumerService;
import com.techcielo.twitreader.service.StreamReaderService;

public class Main {
       public static void main(String[] args) {
              String keywords[] = {"Sochi","Whatsapp"};
              BlockingQueue<TwitterStreamBean> beanQ = new LinkedBlockingQueue<>();
              StreamReaderService producer = new StreamReaderService(beanQ,keywords);
              StreamConsumerService consumer = new StreamConsumerService(beanQ,keywords);
              Thread t1 = new Thread(producer);
              Thread t2 = new Thread(consumer);
              t1.start();
              t2.start();
              System.out.println("Done");
       }
}

So when we run this code following will be output.

Done
Started twit parser
[Mon Feb 24 14:17:30 IST 2014]Establishing connection.
[Mon Feb 24 14:17:34 IST 2014]Connection established.
[Mon Feb 24 14:17:34 IST 2014]Receiving status stream.
Got twit:Yaampun -_- cepet download whatsapp ya :p RT @kunuuuuy: nizaarr cpt beli pulsa ya;)
Count:{WHATSAPP=1}
Got twit:I shouldn't whatsapp him anymore. Or maybe once in 2 weeks time only.
Count:{WHATSAPP=2}
Got twit:RT @pauli_roldan: #Whatsapp #Male #Me #Cuenta #De #La #Vida #De #Justin #Y #Avalanna http://t.co/W1Y6RnwX4Y
Count:{WHATSAPP=3}
Got twit:RT @molnia_sport: ?????????? ???: ?????? ???????? ?? ?????? ???? ???????? ? ????????, ?? ? ??????? ?????

http://t.co/DOfXgS5p6l http://t.c…
Count:{WHATSAPP=3}
Got twit:Beego + Linda | Expecting
image by Wafi Abu
Ryaniez Moments Photography

Call/SMS/Whatsapp 0192044408 (Hash) or... http://t.co/WevB66eHGo
Got twit:Millions Flock to Telegram Messaging App After WhatsApp Outage http://t.co/TD46LQEGTE I'm going to check it out
Got twit:RT @MyFashionBDG: Kode: Stitch Air  75.000 (blm ongkir) | For order sms/whatsapp ke 081392239444 http://t.co/NjFOTiCT8n Fb: http://t.co/jAx…
Count:{WHATSAPP=4}
Count:{WHATSAPP=5}
Count:{WHATSAPP=6}
Got twit:(#Houston_0998) ¿Qué provocó la caída de varias horas de WhatsApp?: El fundador de WhatsApp, J... http://t.co/qhERwrXZfy (#Houston_0998)
Count:{WHATSAPP=7}
Got twit:RT @undyt_: Whatsapp jangan diheboh-hebohin doong, tar jadi ribet kayak aplikasi lain deh. :p
Count:{WHATSAPP=8}
Got twit:RT @orangeflower08: ??????????????????????????????????18????????????????? #????

RT @NBCOlympics We get it, Sochi bear. We're crying too. h…
Count:{WHATSAPP=8, SOCHI=1}
Got twit:RT @A3Noticias: Cuatro millones de usuarios se pasan a Telegram tras las caídas de Whatsapp... para sufrir las mismas averías http://t.co/F…
Count:{WHATSAPP=9, SOCHI=1}
Got twit:RT @morandiniblog: Le fondateur de WhatsApp s'excuse pour la panne " la plus longue et la plus importante depuis des années" #whatsapp http…
Count:{WHATSAPP=10, SOCHI=1}
Got twit:RT @puur: Ik vind een @YouTube-video van @timtimfed leuk: http://t.co/cKu9YDbXHu SOCHI 2014 - Speed Skating Double Dash Final
Count:{WHATSAPP=10, SOCHI=2}
Got twit:aku gk pake BB @vivinopiw_ aku pake Whatsapp, kmu pake gk ?
Count:{WHATSAPP=11, SOCHI=2}
Got twit:@iliacolmenares te escribo a whatsapp y nada, no se que le pasa a mi teléfono... Poblado esta totalmente trancado desde las 3 am.
Count:{WHATSAPP=12, SOCHI=2}
Got twit:RT @ExpansionaT: ¿Qué provocó la caída de varias horas de WhatsApp? http://t.co/prjP7Nn3Ae
Count:{WHATSAPP=13, SOCHI=2}
Got twit:(#Houston_0998) ¿Qué provocó la caída de varias horas de WhatsApp?: El fundador de WhatsApp, J... http://t.co/SZJ2gz5tpG (#Houston_0998)
Got twit:RT @jonipikkuinen: Varoittakaa lapsianne klikkamasta "whatsapp-vanhenee tänään"-linkkejä ja vastaavia puhelimessa. Joutuvat FunLimen 79€/kk.
Count:{WHATSAPP=14, SOCHI=2}
Count:{WHATSAPP=15, SOCHI=2}
Got twit:Facebook rachète WhatsApp pour 63 fois le prix d'un Airbus A380, le retour de la bulle internet ? http://t.co/DZS8A2AJO8
Count:{WHATSAPP=16, SOCHI=2}
Got twit:Facebook rachète WhatsApp pour 63 fois le prix d'un Airbus A380, le retour de la bulle internet ? http://t.co/MiysQgJC7R
Count:{WHATSAPP=17, SOCHI=2}
Got twit:Ya estamos con la tontería de las cadenas disque para que el whatsapp no t cueste dinero
Count:{WHATSAPP=18, SOCHI=2}


Reading Twitter Stream using Twitter4j

In this tutorial we will see how to read twitter stream using java code. First create a utility class that will help you in creating connection to Twitter but in order to create connection you need to have an account with twitter so if you do not have an account with Twitter go to https://twitter.com/  and create your account. Once account is created go to https://apps.twitter.com/ and create an application with twitter api by clicking on “Create New App” button.
You need to provide following information. 
  • Name
  • Description
  • Website
  • Callback URL

Once this information is provided click on “Create your Twitter Application” button.  Once application is created you need to generate Access Token. Go to API Keys tab in your application and generate Access Token and Access Token secret. Note down following information somewhere (May be in notepad?)

  • API Key
  • API Secret
  • Access Token
  • Access Token Secret


Download API called Twitter4J from following location and add following jars from its lib folder in your Eclipse Java project so following is how your java project will look like.



You can ignore mongo-2.10.1.jar and packages and classes for now. We will use information that we saved for twitter application in our utility class as follows. (I have stored this values as public static final String variables in my utility class you can use your info similarly)

package com.techcielo.twitreader.util;

import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;

public class TwitterStreamBuilderUtil {
               
                public static TwitterStream getStream(){
                                ConfigurationBuilder cb = new ConfigurationBuilder();
                                cb.setDebugEnabled(true);
                                cb.setOAuthConsumerKey(Constatnts.consumerkey);
                                cb.setOAuthConsumerSecret(Constatnts.consumerSecret);
                                cb.setOAuthAccessToken(Constatnts.accessToken);
                                cb.setOAuthAccessTokenSecret(Constatnts.accessTokenSecret);
                               
                                return new TwitterStreamFactory(cb.build()).getInstance();
                }
}


Also when you write this code ensure that you are using OAuth api and not OAuth2 API. Once this class is ready we will use it in our Stream reader class as follows. In our class we are reading all twits that has mention of keywords in String array keywords. While writing this tutorial I have used keywords that are currently trending on BBC news e.g. “Sochi”,”Ukraine”,”Whatsapp”.

package com.techcielo.twitreader.service;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;

import com.techcielo.twitreader.bean.TwitterStreamBean;
import com.techcielo.twitreader.util.TwitterStreamBuilderUtil;

public class StreamReaderService
{     
       public void readTwitterFeed() {

              TwitterStream stream = TwitterStreamBuilderUtil.getStream();

              StatusListener listener = new StatusListener() {

                     @Override
                     public void onException(Exception e) {
                           System.out.println("Exception occured:" + e.getMessage());
                           e.printStackTrace();
                     }

                     @Override
                     public void onTrackLimitationNotice(int n) {
                           System.out.println("Track limitation notice for " + n);
                     }

                     @Override
                     public void onStatus(Status status) {
                           System.out.println("Got twit:" + status.getText());
                           TwitterStreamBean bean = new TwitterStreamBean();
                           String username = status.getUser().getScreenName();
                           bean.setUsername(username);
                           long tweetId = status.getId();
                           bean.setId(tweetId);
                           bean.setInReplyUserName(status.getInReplyToScreenName());
                           if (status != null && status.getRetweetedStatus() != null
                                         && status.getRetweetedStatus().getUser() != null) {
                                  bean.setRetwitUserName(status.getRetweetedStatus()
                                                .getUser().getScreenName());
                           }
                           String content = status.getText();
                           bean.setContent(content);
                     }

                     @Override
                     public void onStallWarning(StallWarning arg0) {
                           System.out.println("Stall warning");
                     }

                     @Override
                     public void onScrubGeo(long arg0, long arg1) {
                           System.out.println("Scrub geo with:" + arg0 + ":" + arg1);
                     }

                     @Override
                     public void onDeletionNotice(StatusDeletionNotice arg0) {
                           System.out.println("Status deletion notice");
                     }
              };

              FilterQuery qry = new FilterQuery();
              String[] keywords = { "Sochi","Ukraine","Whatsapp" };

              qry.track(keywords);

              stream.addListener(listener);
              stream.filter(qry);
       }
}

In this example I have used a bean class to store information retrieved from Twitter feed. Interested user can use it for Producer Consumer implementation ofr using this feed. When your run this code following will be output. You can ignore exception that occurs while your class is not able to connect to twitter and it retries for the same.

Done
[Mon Feb 24 12:26:38 IST 2014]Establishing connection.
[Mon Feb 24 12:26:48 IST 2014]Connection established.
[Mon Feb 24 12:26:48 IST 2014]Receiving status stream.
Got twit:RT @MaximEristavi: In Lutsk (Western Ukraine) local riot-policemen  on their knees publicly begged ppl for forgiveness.
via live-stream htt…
Got twit:A mi nunca me falló whatsapp, lo que me falló fue este pinche mundo.
Got twit:RT @policia: Whatsapp sigue "on fire". Hoy, por un bulo q no es nuevo. Que se acaban las cuentas y reenvíes... (bla, bla) Ni caso! http://t…
Got twit:Layanan Sempat Tumbang, Ini Penjelasan Pendiri WhatsApp: Ini menjadi kekurangan terbesar dan terpanjang kami, ... http://t.co/SNOmGkvSst
Got twit:Whatsapp 9992511509
Got twit:RT @edwardlucas: Must-read from @uli_speck on what Merkel must do now for Ukraine http://t.co/Lx4FoxBxWW
Got twit:Layanan Sempat Tumbang, Ini Penjelasan Pendiri WhatsApp: Ini menjadi kekurangan terbesar dan terpanjang kami, ... http://t.co/KynYog3HVa
Got twit:RT @alanbaldwinf1: Sochi airport. Olympic Games? That's so yesterday. http://t.co/5rafgTJ5GZ
Got twit:Layanan Sempat Tumbang, Ini Penjelasan Pendiri WhatsApp: Ini menjadi kekurangan terbesar dan terpanjang kami, ... http://t.co/sGzH2NDn10
Got twit:Holders Spain face Ukraine in Euro 2016 qualifying. http://t.co/HtTjnfW1WT
#beINfootball http://t.co/k7yHJG5F9u
Got twit:RT @JoleenStore: ?? ???? ???? ?? 

Another blog will show you how to read twitter Stream and send it to different channels in Spring Integration.