While looking at examples on
Producer Consumer using Java I was not able to find much of practical examples
so let’s take some real time example. As explained in last blog we can read
stream provided by Twitter using Twitter4J api which sends us real time Twitter
data. So now lets use this feed as producer and we design a consumer that will
check count of twits that we are getting for particular key word.
So first we will change our
StreamReaderService to accept a queue and list of keywords as follows.
package
com.techcielo.twitreader.service;
import
java.util.concurrent.BlockingQueue;
import
twitter4j.FilterQuery;
import
twitter4j.StallWarning;
import twitter4j.Status;
import
twitter4j.StatusDeletionNotice;
import
twitter4j.StatusListener;
import
twitter4j.TwitterStream;
import
com.techcielo.twitreader.bean.TwitterStreamBean;
import
com.techcielo.twitreader.util.TwitterStreamBuilderUtil;
public class
StreamReaderService implements Runnable
{
private
BlockingQueue<TwitterStreamBean> beanQ;
private String[] keywords;
public
StreamReaderService(BlockingQueue<TwitterStreamBean> beanQ,String[]
keywords) {
this.beanQ=beanQ;
this.keywords=keywords;
}
public void readTwitterFeed()
{
TwitterStream stream =
TwitterStreamBuilderUtil.getStream();
StatusListener listener = new StatusListener() {
@Override
public void
onException(Exception e) {
System.out.println("Exception
occured:" + e.getMessage());
}
@Override
public void
onTrackLimitationNotice(int n) {
System.out.println("Track
limitation notice for " + n);
}
@Override
public void onStatus(Status
status) {
System.out.println("Got
twit:" + status.getText());
TwitterStreamBean
bean = new TwitterStreamBean();
String username =
status.getUser().getScreenName();
bean.setUsername(username);
long tweetId =
status.getId();
bean.setId(tweetId);
bean.setInReplyUserName(status.getInReplyToScreenName());
if (status != null &&
status.getRetweetedStatus() != null
&&
status.getRetweetedStatus().getUser() != null) {
bean.setRetwitUserName(status.getRetweetedStatus()
.getUser().getScreenName());
}
String content =
status.getText();
bean.setContent(content);
try {
beanQ.put(bean);
} catch
(InterruptedException e) {
System.out.println("Error
occured while pusing feed data");
}
}
@Override
public void
onStallWarning(StallWarning arg0) {
System.out.println("Stall
warning");
}
@Override
public void onScrubGeo(long arg0, long arg1) {
System.out.println("Scrub geo
with:" + arg0 + ":" + arg1);
}
@Override
public void
onDeletionNotice(StatusDeletionNotice arg0) {
System.out.println("Status
deletion notice");
}
};
FilterQuery qry = new FilterQuery();
qry.track(keywords);
stream.addListener(listener);
stream.filter(qry);
}
@Override
public void run() {
readTwitterFeed();
}
}
|
Now we will create a consumer
that will accept keywords as constructor argument and when twit comes to this
queue we will check for index of keyword in it and display count for each
keyword. So following will be our consumer class.
package
com.techcielo.twitreader.service;
import java.util.HashMap;
import java.util.Map;
import
java.util.concurrent.BlockingQueue;
import
java.util.concurrent.TimeUnit;
import
com.techcielo.twitreader.bean.TwitterStreamBean;
public class
StreamConsumerService implements Runnable{
private
BlockingQueue<TwitterStreamBean> beanQ;
private String[] keywords;
private Map<String, Integer> countmap=new HashMap<>();
public
StreamConsumerService(BlockingQueue<TwitterStreamBean> beanQ, String[]
keywords) {
this.beanQ=beanQ;
this.keywords=keywords;
}
@Override
public void run() {
System.out.println("Started
twit parser");
while(true){
try {
TwitterStreamBean
bean = beanQ.poll(1, TimeUnit.MILLISECONDS);
if(bean!=null){
for(int i=0;i<keywords.length;i++){
if(bean.getContent().toUpperCase().indexOf(keywords[i].toUpperCase())!=-1){
if(countmap.containsKey(keywords[i].toUpperCase())){
countmap.put(keywords[i].toUpperCase(), countmap.get(keywords[i].toUpperCase())+1);
}
else{
countmap.put(keywords[i].toUpperCase(),
1);
}
}
}
System.out.println("Count:"+countmap);
}
} catch
(InterruptedException e) {
}
}
}
}
|
Now we will create a class to
start producer and consumer service as follows.
package
com.techcielo.twitreader.main;
import
java.util.concurrent.BlockingQueue;
import
java.util.concurrent.LinkedBlockingQueue;
import
com.techcielo.twitreader.bean.TwitterStreamBean;
import
com.techcielo.twitreader.service.StreamConsumerService;
import
com.techcielo.twitreader.service.StreamReaderService;
public class Main {
public static void main(String[]
args) {
String keywords[] = {"Sochi","Whatsapp"};
BlockingQueue<TwitterStreamBean>
beanQ = new LinkedBlockingQueue<>();
StreamReaderService producer = new
StreamReaderService(beanQ,keywords);
StreamConsumerService consumer =
new
StreamConsumerService(beanQ,keywords);
Thread t1 = new Thread(producer);
Thread t2 = new Thread(consumer);
t1.start();
t2.start();
System.out.println("Done");
}
}
|
So when we run this code
following will be output.
Done
Started twit parser
[Mon Feb 24
14:17:30 IST 2014]Establishing connection.
[Mon Feb 24
14:17:34 IST 2014]Connection established.
[Mon Feb 24
14:17:34 IST 2014]Receiving status stream.
Got twit:Yaampun
-_- cepet download whatsapp ya :p RT @kunuuuuy: nizaarr cpt beli pulsa ya;)
Count:{WHATSAPP=1}
Got twit:I
shouldn't whatsapp him anymore. Or maybe once in 2 weeks time only.
Count:{WHATSAPP=2}
Got twit:RT
@pauli_roldan: #Whatsapp #Male #Me #Cuenta #De #La #Vida #De #Justin #Y
#Avalanna http://t.co/W1Y6RnwX4Y
Count:{WHATSAPP=3}
Got twit:RT
@molnia_sport: ?????????? ???: ?????? ???????? ?? ?????? ???? ???????? ?
????????, ?? ? ??????? ?????
http://t.co/DOfXgS5p6l
http://t.c…
Count:{WHATSAPP=3}
Got twit:Beego +
Linda | Expecting
image by Wafi Abu
Ryaniez Moments
Photography
Call/SMS/Whatsapp
0192044408 (Hash) or... http://t.co/WevB66eHGo
Got twit:Millions
Flock to Telegram Messaging App After WhatsApp Outage http://t.co/TD46LQEGTE
I'm going to check it out
Got twit:RT
@MyFashionBDG: Kode: Stitch Air 75.000
(blm ongkir) | For order sms/whatsapp ke 081392239444 http://t.co/NjFOTiCT8n
Fb: http://t.co/jAx…
Count:{WHATSAPP=4}
Count:{WHATSAPP=5}
Count:{WHATSAPP=6}
Got
twit:(#Houston_0998) ¿Qué provocó la caída de varias horas de WhatsApp?: El
fundador de WhatsApp, J... http://t.co/qhERwrXZfy (#Houston_0998)
Count:{WHATSAPP=7}
Got twit:RT
@undyt_: Whatsapp jangan diheboh-hebohin doong, tar jadi ribet kayak aplikasi
lain deh. :p
Count:{WHATSAPP=8}
Got twit:RT
@orangeflower08: ??????????????????????????????????18????????????????? #????
RT @NBCOlympics We
get it, Sochi bear. We're crying too. h…
Count:{WHATSAPP=8,
SOCHI=1}
Got twit:RT
@A3Noticias: Cuatro millones de usuarios se pasan a Telegram tras las caídas
de Whatsapp... para sufrir las mismas averías http://t.co/F…
Count:{WHATSAPP=9,
SOCHI=1}
Got twit:RT
@morandiniblog: Le fondateur de WhatsApp s'excuse pour la panne " la
plus longue et la plus importante depuis des années" #whatsapp http…
Count:{WHATSAPP=10,
SOCHI=1}
Got twit:RT @puur:
Ik vind een @YouTube-video van @timtimfed leuk: http://t.co/cKu9YDbXHu SOCHI
2014 - Speed Skating Double Dash Final
Count:{WHATSAPP=10,
SOCHI=2}
Got twit:aku gk
pake BB @vivinopiw_ aku pake Whatsapp, kmu pake gk ?
Count:{WHATSAPP=11,
SOCHI=2}
Got
twit:@iliacolmenares te escribo a whatsapp y nada, no se que le pasa a mi teléfono...
Poblado esta totalmente trancado desde las 3 am.
Count:{WHATSAPP=12,
SOCHI=2}
Got twit:RT
@ExpansionaT: ¿Qué provocó la caída de varias horas de WhatsApp? http://t.co/prjP7Nn3Ae
Count:{WHATSAPP=13,
SOCHI=2}
Got
twit:(#Houston_0998) ¿Qué provocó la caída de varias horas de WhatsApp?: El
fundador de WhatsApp, J... http://t.co/SZJ2gz5tpG (#Houston_0998)
Got twit:RT
@jonipikkuinen: Varoittakaa lapsianne klikkamasta "whatsapp-vanhenee tänään"-linkkejä
ja vastaavia puhelimessa. Joutuvat FunLimen 79€/kk.
Count:{WHATSAPP=14,
SOCHI=2}
Count:{WHATSAPP=15,
SOCHI=2}
Got twit:Facebook
rachète WhatsApp pour 63 fois le prix d'un Airbus A380, le retour de la bulle
internet ? http://t.co/DZS8A2AJO8
Count:{WHATSAPP=16,
SOCHI=2}
Got twit:Facebook
rachète WhatsApp pour 63 fois le prix d'un Airbus A380, le retour de la bulle
internet ? http://t.co/MiysQgJC7R
Count:{WHATSAPP=17,
SOCHI=2}
Got twit:Ya estamos
con la tontería de las cadenas disque para que el whatsapp no t cueste dinero
Count:{WHATSAPP=18,
SOCHI=2}
|