Esper

From XennisWiki
Jump to: navigation, search

Esper is a component for Complex Event Processing (CEP) and available for Java and others.

Simple stock example

Used Software:

In the Java Build Path the following libraries must be included:

  • esper-4.9.0/esper-4.9.0.jar
  • esper-4.9.0/esper/lib/antlr-runtime-3.2.jar
  • esper-4.9.0/esper/lib/cglib-nodep-2.2.jar
  • esper-4.9.0/esper/lib/commons-logging-1.1.1.jar
  • esper-4.9.0/esper/lib/log4j-1.2.16.jar

Code

Main.java

import com.espertech.esper.client.*;

import java.util.StringTokenizer;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.io.File;
import java.io.FileReader;
import java.io.BufferedReader;
import java.util.Scanner;

public class Main {

	public static void main(String[] args) {
		readStream();
	}
	
	public static class CEPListener implements UpdateListener {
		
		public void update(EventBean[] newData, EventBean[] oldData) {
			System.out.println("EVENT: " + newData[0].getUnderlying());
		}

	}
	
	public static void readStream() {
		// Configuration
		Configuration cepConfig = new Configuration();
		cepConfig.addEventType("StockTickExample", Tick.class.getName());
		// Setup provider, runtime and administrator
		EPServiceProvider cep = EPServiceProviderManager.getProvider("Main", cepConfig);
		EPRuntime cepRuntime = cep.getEPRuntime();
		EPAdministrator cepAdmin = cep.getEPAdministrator();
		
		performeQuery(cepRuntime, cepAdmin, "MyStock.csv", "query.epl");
	}

	/**
	 * Performing given query file.
	 * 
	 * @param cepRuntime EPRuntime
	 * @param cepAdmin	EPAdministrator
	 * @param streamFileName File name of the stream, e.g. 'BTC.csv'
	 * @param queryFileName File name of the query, e.g. 'query.epl'
	 */
	public static void performeQuery(EPRuntime cepRuntime,
			EPAdministrator cepAdmin, String streamFileName,
			String queryFileName) {
		System.out.println("------ Performing Query ------");

		try {
			String query = new Scanner(new File(queryFileName)).useDelimiter(
					"\\Z").next();
			cepAdmin.destroyAllStatements();
			EPStatement cepStatement = cepAdmin.createEPL(query);
			cepStatement.addListener(new CEPListener());
		} catch (Exception e) {
			System.err.println("Error: " + e.getMessage());
		}

		// Generate ticks
		try {
			String line = null;
			File file = new File(streamFileName);
			BufferedReader reader = new BufferedReader(new FileReader(file));

			while ((line = reader.readLine()) != null) {
				Tick tick = new Tick("MyStock");
				StringTokenizer tokenizer = new StringTokenizer(line, ",");

				try {
					DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
					tick.setDate(dateFormat.parse(tokenizer.nextToken()));
				} catch (Exception e) {
					System.err.println("Error: " + e.getMessage());
				}

				tick.setWeightedPrice(Double.parseDouble(tokenizer.nextToken()));
				tick.setVolume(Double.parseDouble(tokenizer.nextToken()));

				// System.out.println("TICK: " + tick);
				cepRuntime.sendEvent(tick);
			}

			reader.close();
		} catch (Exception e) {
			System.err.println("Error: " + e.getMessage());
		}
	}

}

Tick.java

import java.util.Date;


public class Tick {

	private String symbol;
	private Date date;
	private double volume;
	private double weightedPrice;
	
	public Tick(String symbol) {
		this.symbol = symbol;		
	}
	
	public Tick(String symbol, Date date, double volume, double weightedPrice) {
		this.symbol = symbol;
		this.date = date;
		this.volume = volume;
		this.weightedPrice = weightedPrice;
	}	
	
	public String getSymbol() {
		return symbol;
	}

	public void setSymbol(String symbol) {
		this.symbol = symbol;
	}

	public Date getDate() {
		return date;
	}

	public void setDate(Date date) {
		this.date = date;
	}

	public double getVolume() {
		return volume;
	}

	public void setVolume(double volume) {
		this.volume = volume;
	}

	public double getWeightedPrice() {
		return weightedPrice;
	}

	public void setWeightedPrice(double weightedPrice) {
		this.weightedPrice = weightedPrice;
	}

	@Override
	public String toString() {
		return "Tick [symbol=" + symbol + ", date=" + date + ", volume="
				+ volume + ", weightedPrice=" + weightedPrice + "]";
	}

}

EPL query and CSV file

query.epl file with a query in the Event Processing Language (EPL):

SELECT *
FROM StockTickExample

MyStock.csv (only an excerpt)

2013-09-18, 0.2434, 2021
2013-09-19, 0.9280, 1500
2013-09-20, 0.3434, 1021

Event Processing Language (EPL)

Further informations: Overview of the Event Processing Language (EPL), Esper Reference,

Simple query with a WHERE conditional and an alias (AS).

SELECT datestamp.format() as datestampFormat, price
FROM   StockTick
WHERE  volume > 200000

Query with an overlapping windows with the fixed size 30 and an query as where-condition.

SELECT datestamp, high 
FROM   StockTick.win:length_batch(30) 
WHERE (
        SELECT MAX(high)
        FROM   StockTick.win:length_batch(30)
    ) = high

Query with an followed-by pattern (further information: Chapter 6. EPL Reference: Patterns)

SELECT startDate, stopDate 
FROM   pattern [every a=StockTick -> b=StockTick(a.weightedPrice*100 <= b.weightedPrice)]

Query with uses between and plus to handle dates (further information: Chapter 11. EPL Reference: Date-Time Methods)

SELECT a.datestamp AS startDate, a.weightedPrice AS startWeightedPrice, b.dateStamp AS stopDate, b.weightedPrice AS stopWeightedPrice
FROM pattern [
        every a=StockTick -> b=StockTick(
            b.weightedPrice < a.weightedPrice / 2 AND b.timestamp.between(a.timestamp, a.timestamp.plus(30 days))
        )
    ]

See also

External links