Complex Event Processing - Esper and Twitter (Twitter4J)

From XennisWiki
Jump to: navigation, search

This article describes a Java programm, that reads the Twitter Stream using the Complex Event Processing Systems Esper. As a Java library for the Twitter API is used Twitter4J.

This article based on the the following: Java - Twitter API (Twitter4J) and Esper. For further information see also Twitter4j and Esper: Tracking user sentiments on Twitter.

Code

Java

TwitterStreamCEP.java

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Scanner;

import com.espertech.esper.client.Configuration;
import com.espertech.esper.client.EPAdministrator;
import com.espertech.esper.client.EPRuntime;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;

import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterException;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.auth.AccessToken;

public class TwitterStreamCEP {
	
	static EPRuntime cepRuntime;

	public static void main(String[] args) {
		
		try {
			createEsperRuntime("query.epl");
			listenToTwitterStream(createTwitterStream());
		} catch (TwitterException | IOException e) {
			e.printStackTrace();
		}
	}

	public static class CEPListener implements UpdateListener {
		
		public void update(EventBean[] newData, EventBean[] oldData) {
			try {
//				if (newData == null) {
//					return;
//				}
				EventBean event = newData[0];
//				System.out.println("EVENT: " + event.get("user") + " " + event.get("timeZone") +  " " + event.get("lang"));
				System.out.println("EVENT: " + event.getUnderlying());
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

	}	
	
	public static EPRuntime createEsperRuntime(String queryFileName) throws FileNotFoundException {
		// Configuration
		Configuration cepConfig = new Configuration();
		cepConfig.addEventType("TwitterBean", TwitterBean.class.getName());
		// Setup provider, runtime and administrator
		EPServiceProvider cep = EPServiceProviderManager.getProvider("TwitterStreamCEP", cepConfig);
		cepRuntime = cep.getEPRuntime();
		EPAdministrator cepAdmin = cep.getEPAdministrator();
		
		System.out.println("------ Performing Query ------");

		String query = new Scanner(new File(queryFileName)).useDelimiter("\\Z").next();
		cepAdmin.destroyAllStatements();
		EPStatement cepStatement = cepAdmin.createEPL(query);
		cepStatement.addListener(new CEPListener());
		
		return cepRuntime;
	}	
	
	/**
	 * Creates and return object from Twitter stream factory.
	 * 
	 * @return Twitter stream object.
	 */
	public static TwitterStream createTwitterStream() {
		TwitterStream twitterStream = new TwitterStreamFactory().getInstance();

		// Set access by OAuth
		twitter.setOAuthConsumer("CONSUMER-KEY", "CONSUMER-SECRET");
		twitter.setOAuthAccessToken(new AccessToken("ACCESS-TOKEN", "ACCESS-TOKEN-SECRET"));


		return twitterStream;
	}
	
	/**
	 * Print Twitter stream.
	 * 
	 * @param twitterStream
	 *            Object from TwitterStreamFactory
	 * @throws TwitterException
	 * @throws IOException
	 */
	public static void listenToTwitterStream(twitter4j.TwitterStream twitterStream)
			throws TwitterException, IOException {
		StatusListener listener = new StatusListener() {

			@Override
			public void onStatus(Status status) {
				if (status.getGeoLocation() != null) {
				TwitterBean twitterBean = new TwitterBean(status);
				cepRuntime.sendEvent(twitterBean);
				}
//				System.out.println(status.getUser().getName() + ": " + status.getText());
			}

			@Override
			public void onDeletionNotice(
					StatusDeletionNotice statusDeletionNotice) {
					//System.out.println("Got status deletion notice - id:" + statusDeletionNotice.getStatusId());
			}

			@Override
			public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
				//System.out.println("Got track limitation notice: " + numberOfLimitedStatuses);
			}

			@Override
			public void onException(Exception ex) {
				ex.printStackTrace();
			}

			@Override
			public void onScrubGeo(long userId, long upToStatusId) {
				//System.out.println("Got scrub geo - userId:" + userId + " upToStatusId:" + upToStatusId);
			}

			@Override
			public void onStallWarning(StallWarning stallWarning) {
				// System.out.println("Got stall warning :" + stallWarning);
			}
		};

		twitterStream.addListener(listener);
		twitterStream.sample();
	}

}

TwitterBean.java

import twitter4j.Status;

public class TwitterBean {

	private String user;
	private String lang;
	private int followersCount;
//	private String text;
	private String timeZone;
	private double latitude;
	private double longitude;
	private int UTcOffset;

	public int getFollowersCount() {
		return followersCount;
	}

	public void setFollowersCount(int followersCount) {
		this.followersCount = followersCount;
	}

	public double getLatitude() {
		return latitude;
	}

	public void setLatitude(double latitude) {
		this.latitude = latitude;
	}

	public double getLongitude() {
		return longitude;
	}

	public void setLongitude(double longitude) {
		this.longitude = longitude;
	}

	public int getUTcOffset() {
		return UTcOffset;
	}

	public void setUTcOffset(int uTcOffset) {
		UTcOffset = uTcOffset;
	}

	public TwitterBean() {
		
	}
	
	public TwitterBean(Status status) {
		this.user = status.getUser().getScreenName();
		this.lang = status.getUser().getLang();
		this.followersCount = status.getUser().getFollowersCount();
		this.latitude = status.getGeoLocation().getLatitude();
		this.longitude = status.getGeoLocation().getLongitude();
		this.UTcOffset = status.getUser().getUtcOffset();
		this.timeZone = status.getUser().getTimeZone();
//		this.text = status.getText();		
	}

	public String getLang() {
		return lang;
	}

	public void setLang(String lang) {
		this.lang = lang;
	}

	public String getUser() {
		return user;
	}

	public void setUser(String user) {
		this.user = user;
	}
	
	public String getTimeZone() {
		return timeZone;
	}

	public void setTimeZone(String timeZone) {
		this.timeZone = timeZone;
	}

	@Override
	public String toString() {
		return "TwitterBean [user=" + user + ", lang=" + lang
				+ ", followersCount=" + followersCount + ", timeZone="
				+ timeZone + ", latitude=" + latitude + ", longitude="
				+ longitude + ", UTcOffset=" + UTcOffset + "]";
	}

	
}

EPL query

query.epl

SELECT *
FROM TwitterBean
WHERE lang = 'en'

Ouput

Extract of the output

EVENT: TwitterBean [user=roadtrips4beer, lang=en, followersCount=3104, timeZone=Eastern Time (US & Canada), latitude=28.8993, longitude=-82.5931, UTcOffset=-14400]
EVENT: TwitterBean [user=lexiis_x3, lang=en, followersCount=115, timeZone=null, latitude=40.79238389, longitude=-74.19290341, UTcOffset=-1]
EVENT: TwitterBean [user=bomshelkiley, lang=en, followersCount=211, timeZone=null, latitude=35.13693722, longitude=-97.39501447, UTcOffset=-1]

See also

External links