Feb 24, 2014

Producer-Consumer example with Twitter Stream reading

While looking at examples on Producer Consumer using Java I was not able to find much of practical examples so let’s take some real time example. As explained in last blog we can read stream provided by Twitter using Twitter4J api which sends us real time Twitter data. So now lets use this feed as producer and we design a consumer that will check count of twits that we are getting for particular key word.

So first we will change our StreamReaderService to accept a queue and list of keywords as follows.

package com.techcielo.twitreader.service;

import java.util.concurrent.BlockingQueue;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;

import com.techcielo.twitreader.bean.TwitterStreamBean;
import com.techcielo.twitreader.util.TwitterStreamBuilderUtil;

public class StreamReaderService implements Runnable
{
       private BlockingQueue<TwitterStreamBean> beanQ;
       private String[] keywords;
       public StreamReaderService(BlockingQueue<TwitterStreamBean> beanQ,String[] keywords) {
              this.beanQ=beanQ;
              this.keywords=keywords;
       }
      
       public void readTwitterFeed() {

              TwitterStream stream = TwitterStreamBuilderUtil.getStream();

              StatusListener listener = new StatusListener() {

                     @Override
                     public void onException(Exception e) {
                           System.out.println("Exception occured:" + e.getMessage());
                     }

                     @Override
                     public void onTrackLimitationNotice(int n) {
                           System.out.println("Track limitation notice for " + n);
                     }

                     @Override
                     public void onStatus(Status status) {
                           System.out.println("Got twit:" + status.getText());
                           TwitterStreamBean bean = new TwitterStreamBean();
                           String username = status.getUser().getScreenName();
                           bean.setUsername(username);
                           long tweetId = status.getId();
                           bean.setId(tweetId);
                           bean.setInReplyUserName(status.getInReplyToScreenName());
                           if (status != null && status.getRetweetedStatus() != null
                                         && status.getRetweetedStatus().getUser() != null) {
                                  bean.setRetwitUserName(status.getRetweetedStatus()
                                                .getUser().getScreenName());
                           }
                           String content = status.getText();
                           bean.setContent(content);
                           try {
                                  beanQ.put(bean);
                           } catch (InterruptedException e) {
                                  System.out.println("Error occured while pusing feed data");
                           }
                     }

                     @Override
                     public void onStallWarning(StallWarning arg0) {
                           System.out.println("Stall warning");
                     }

                     @Override
                     public void onScrubGeo(long arg0, long arg1) {
                           System.out.println("Scrub geo with:" + arg0 + ":" + arg1);
                     }

                     @Override
                     public void onDeletionNotice(StatusDeletionNotice arg0) {
                           System.out.println("Status deletion notice");
                     }
              };
              FilterQuery qry = new FilterQuery();
              qry.track(keywords);

              stream.addListener(listener);
              stream.filter(qry);
       }

       @Override
       public void run() {
              readTwitterFeed();
       }
}


Now we will create a consumer that will accept keywords as constructor argument and when twit comes to this queue we will check for index of keyword in it and display count for each keyword. So following will be our consumer class.

package com.techcielo.twitreader.service;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import com.techcielo.twitreader.bean.TwitterStreamBean;

public class StreamConsumerService implements Runnable{
      
       private BlockingQueue<TwitterStreamBean> beanQ;
       private String[] keywords;
       private Map<String, Integer> countmap=new HashMap<>();
       public StreamConsumerService(BlockingQueue<TwitterStreamBean> beanQ, String[] keywords) {
              this.beanQ=beanQ;
              this.keywords=keywords;
       }
       @Override
       public void run() {
              System.out.println("Started twit parser");
              while(true){
                     try {
                           TwitterStreamBean bean = beanQ.poll(1, TimeUnit.MILLISECONDS);
                           if(bean!=null){
                                  for(int i=0;i<keywords.length;i++){
                                         if(bean.getContent().toUpperCase().indexOf(keywords[i].toUpperCase())!=-1){
                                                if(countmap.containsKey(keywords[i].toUpperCase())){
                                                       countmap.put(keywords[i].toUpperCase(), countmap.get(keywords[i].toUpperCase())+1);
                                                }
                                                else{
                                                       countmap.put(keywords[i].toUpperCase(), 1);
                                                }
                                         }
                                  }
                                  System.out.println("Count:"+countmap);
                           }
                     } catch (InterruptedException e) {
                     }
              }
       }
}

Now we will create a class to start producer and consumer service as follows.

package com.techcielo.twitreader.main;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import com.techcielo.twitreader.bean.TwitterStreamBean;
import com.techcielo.twitreader.service.StreamConsumerService;
import com.techcielo.twitreader.service.StreamReaderService;

public class Main {
       public static void main(String[] args) {
              String keywords[] = {"Sochi","Whatsapp"};
              BlockingQueue<TwitterStreamBean> beanQ = new LinkedBlockingQueue<>();
              StreamReaderService producer = new StreamReaderService(beanQ,keywords);
              StreamConsumerService consumer = new StreamConsumerService(beanQ,keywords);
              Thread t1 = new Thread(producer);
              Thread t2 = new Thread(consumer);
              t1.start();
              t2.start();
              System.out.println("Done");
       }
}

So when we run this code following will be output.

Done
Started twit parser
[Mon Feb 24 14:17:30 IST 2014]Establishing connection.
[Mon Feb 24 14:17:34 IST 2014]Connection established.
[Mon Feb 24 14:17:34 IST 2014]Receiving status stream.
Got twit:Yaampun -_- cepet download whatsapp ya :p RT @kunuuuuy: nizaarr cpt beli pulsa ya;)
Count:{WHATSAPP=1}
Got twit:I shouldn't whatsapp him anymore. Or maybe once in 2 weeks time only.
Count:{WHATSAPP=2}
Got twit:RT @pauli_roldan: #Whatsapp #Male #Me #Cuenta #De #La #Vida #De #Justin #Y #Avalanna http://t.co/W1Y6RnwX4Y
Count:{WHATSAPP=3}
Got twit:RT @molnia_sport: ?????????? ???: ?????? ???????? ?? ?????? ???? ???????? ? ????????, ?? ? ??????? ?????

http://t.co/DOfXgS5p6l http://t.c…
Count:{WHATSAPP=3}
Got twit:Beego + Linda | Expecting
image by Wafi Abu
Ryaniez Moments Photography

Call/SMS/Whatsapp 0192044408 (Hash) or... http://t.co/WevB66eHGo
Got twit:Millions Flock to Telegram Messaging App After WhatsApp Outage http://t.co/TD46LQEGTE I'm going to check it out
Got twit:RT @MyFashionBDG: Kode: Stitch Air  75.000 (blm ongkir) | For order sms/whatsapp ke 081392239444 http://t.co/NjFOTiCT8n Fb: http://t.co/jAx…
Count:{WHATSAPP=4}
Count:{WHATSAPP=5}
Count:{WHATSAPP=6}
Got twit:(#Houston_0998) ¿Qué provocó la caída de varias horas de WhatsApp?: El fundador de WhatsApp, J... http://t.co/qhERwrXZfy (#Houston_0998)
Count:{WHATSAPP=7}
Got twit:RT @undyt_: Whatsapp jangan diheboh-hebohin doong, tar jadi ribet kayak aplikasi lain deh. :p
Count:{WHATSAPP=8}
Got twit:RT @orangeflower08: ??????????????????????????????????18????????????????? #????

RT @NBCOlympics We get it, Sochi bear. We're crying too. h…
Count:{WHATSAPP=8, SOCHI=1}
Got twit:RT @A3Noticias: Cuatro millones de usuarios se pasan a Telegram tras las caídas de Whatsapp... para sufrir las mismas averías http://t.co/F…
Count:{WHATSAPP=9, SOCHI=1}
Got twit:RT @morandiniblog: Le fondateur de WhatsApp s'excuse pour la panne " la plus longue et la plus importante depuis des années" #whatsapp http…
Count:{WHATSAPP=10, SOCHI=1}
Got twit:RT @puur: Ik vind een @YouTube-video van @timtimfed leuk: http://t.co/cKu9YDbXHu SOCHI 2014 - Speed Skating Double Dash Final
Count:{WHATSAPP=10, SOCHI=2}
Got twit:aku gk pake BB @vivinopiw_ aku pake Whatsapp, kmu pake gk ?
Count:{WHATSAPP=11, SOCHI=2}
Got twit:@iliacolmenares te escribo a whatsapp y nada, no se que le pasa a mi teléfono... Poblado esta totalmente trancado desde las 3 am.
Count:{WHATSAPP=12, SOCHI=2}
Got twit:RT @ExpansionaT: ¿Qué provocó la caída de varias horas de WhatsApp? http://t.co/prjP7Nn3Ae
Count:{WHATSAPP=13, SOCHI=2}
Got twit:(#Houston_0998) ¿Qué provocó la caída de varias horas de WhatsApp?: El fundador de WhatsApp, J... http://t.co/SZJ2gz5tpG (#Houston_0998)
Got twit:RT @jonipikkuinen: Varoittakaa lapsianne klikkamasta "whatsapp-vanhenee tänään"-linkkejä ja vastaavia puhelimessa. Joutuvat FunLimen 79€/kk.
Count:{WHATSAPP=14, SOCHI=2}
Count:{WHATSAPP=15, SOCHI=2}
Got twit:Facebook rachète WhatsApp pour 63 fois le prix d'un Airbus A380, le retour de la bulle internet ? http://t.co/DZS8A2AJO8
Count:{WHATSAPP=16, SOCHI=2}
Got twit:Facebook rachète WhatsApp pour 63 fois le prix d'un Airbus A380, le retour de la bulle internet ? http://t.co/MiysQgJC7R
Count:{WHATSAPP=17, SOCHI=2}
Got twit:Ya estamos con la tontería de las cadenas disque para que el whatsapp no t cueste dinero
Count:{WHATSAPP=18, SOCHI=2}