Spread Toolkit

From XennisWiki
(Redirected from Spread)
Jump to: navigation, search

The Spread Toolkit is a computer software package that provides a high performance group communication system that is resilient to faults across local and wide area networks. Spread functions as a unified message bus for distributed applications, and provides highly tuned application-level multicast, group communication, and point to point support. (Wikipedia)

Java example - Bank account replication

This distributed application models a replicated bank account. It consists of the standard Spread daemon/server and a set of clients, which represent the replicas.

Each client has a local balance value of the bank command and accept the following commands:

  • balance: print current balance
  • deposit <amount>: increase the balance (on all replicas) by the amount value
  • withdraw <amount>: decrease the balance

Project

Used platforms and software

Structure of the project

My_CORBA_project
  + src
  |  + (default)
  |  |  |- AccountReplica.java
  |  |  |- Main.java
  |  |
  |  + spread
  |  |  |- AdvancedMessageListener.java
  |  |  |- BasicMessageListener.java
  |  |  |- [... and all other Spread Java files]

Code

Main.java

public class Main {

	public static void main(String[] args) {
		
		try {
			
			// Get arguments
			String host = args[0];
			String accountName = args[1];
			int numReplicas = Integer.parseInt(args[2]);
			
			String fileName = "";
			if (args.length > 3) {
				fileName = args[3];
			}
			
			// Create a replica account and run it
			AccountReplica accountReplica = new AccountReplica(host, accountName, numReplicas);
			accountReplica.run(fileName);
			
		} catch(Exception e) {
			System.out.println("Usage:\n"
				+ "\tjava AccountReplica <server address> <account name> <number of replicas> [file name]");
			e.printStackTrace();
		}
	}
}

AccountReplica.java

package assignment;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Random;

import spread.BasicMessageListener;
import spread.MembershipInfo;
import spread.SpreadConnection;
import spread.SpreadException;
import spread.SpreadGroup;
import spread.SpreadMessage;

public class AccountReplica implements BasicMessageListener {
	
	/** The Spread connection */
	private SpreadConnection spreadConnection;
	
	/** A Spread group */
	private SpreadGroup spreadGroup;
	
	/** Reader for user input */
	private BufferedReader reader;
	
	/** Balance of the bank account */
	private double balance;
	
	/** Number of replicas / members in group*/
	private int numGroupMembers;
	
	/** Number of wanted replicas */
	private int numReplicas;
	
	/** Member name in group */
	private String privateName;
	
	/** True, while program is running */
	private boolean isRun;
	
	
	/**
	 * Create a account replica.
	 * 
	 * @param host
	 *            Server name
	 * @param accounterName
	 *            Name of the account
	 * @param numReplicas
	 *            Number of replicas
	 */
	public AccountReplica(String host, String accounterName, int numReplicas) {
		__info("New AccountReplica host:" + host + ", accounterName:" + accounterName + ", numReplicas: " + numReplicas);	
		
		// Init spread
		if ( initSpread(host, 4333, accounterName) ) {
		
			// Init variables
			balance = 0.0;
			numGroupMembers = 0;
			isRun = true;
			this.numReplicas = numReplicas;	
		}
	}
		
	/**
	 * Initialize spread connection and joins given group.
	 * 
	 * @param host
	 *            Host name
	 * @param port
	 *            Port number
	 * @param groupName
	 *            Group name
	 * @return true, if initialization was successful
	 */
	private boolean initSpread(String host, int port, String groupName) {
		Random randomGenerator = new Random();
		int randomInt = randomGenerator.nextInt(100);
		privateName =  "DoRo" + randomInt;
		
		spreadConnection = new SpreadConnection();
		spreadGroup = new SpreadGroup();
		try {
			spreadConnection.connect(InetAddress.getByName(host), port, privateName, false, true);
			spreadGroup.join(spreadConnection, groupName);
			__info("Join group:" + groupName);
			
			spreadConnection.add(this);
			return true;
		} catch (SpreadException e) {
			__error("initSpread", "SpreadException", e);
		} catch (UnknownHostException e) {
			__error("initSpread", "UnknownHostException", e);
		}
		return false;
	}
	
	/**
	 * Run account replica.
	 * 
	 * @param fileName Empty string or file name
	 */
	public void run(String fileName) {
		
		if (spreadConnection.isConnected()) {

			// Wait for all replicas
			__info("Wait for " + numReplicas + " replicas join the group.");
			while(numReplicas > numGroupMembers) {
				System.out.print("");
			}
			
			__info("All replicas joined group.");
			
			String line;
	
			// No file means accept commands from user interface
			if (fileName.isEmpty()) {
				
				reader = new BufferedReader(new InputStreamReader(System.in));
				printMenu();
				
				while(isRun) {
					try {
						line = reader.readLine();
						command(line, true);
					} catch (IOException e) {
						e.printStackTrace();
					}
				}
				
			}
			else {
	
				try {
					reader = new BufferedReader(new FileReader(fileName));
					
					while(isRun) {
						try {
							line = reader.readLine();
							if (line != null) {
								command(line, true);
							} else {
								break;
							}
						} catch (IOException e) {
							e.printStackTrace();
						}
					}
					
				} catch (FileNotFoundException e) {
					__error("run", "FileNotFoundException", e);
				}
			}	
		}
	}

	/**
	 * Prints the user menu.
	 */
	private void printMenu() {
		System.out.println(
			"\n" +
			"++++++++++++++++++++++++\n" +
			"AccountReplica - Menu:\n" +
			"++++++++++++++++++++++++\n" +
			"\n" +
			"Options:\n" +
			"\tbalance\n" +
			"\tdeposit <amount>\n" +
			"\twithdraw <amount>\n" +
			"\taddinerest <percent>\n" +
			"\tsleep <duration>\n" +
			"\texit\n");
	}
	
	
	/**
	 * Split a command string.
	 * 
	 * @param line
	 *            command as string, e.g. "deposit 30"
	 * @param isCommand
	 *            true, when it is a line from the user or from the file (and
	 *            not a message)
	 */
	public void command(String line, boolean isCommand) {
				
		try {
	        switch(line) {
		        case "balance":
		        	commandBalance();
		        	break;
		        case "exit":
		        	commandExit();
		        	break;
		        default:
		        	String[] options = line.split(" ");
		        	double amount;
		        	switch (options[0]) {
		        		case "balance":
		        			amount = Double.parseDouble(options[1]);
		        			if (balance == 0) {
		        				balance += amount;
		        				System.out.println("New balance=" + balance);
		        			}
		        			break;
				        case "deposit":
				        	amount = Double.parseDouble(options[1]);
				    		if (isCommand) messageSend("deposit " + amount);
				    		else setBalance(amount);
				        	break;
				        case "withdraw":
				        	amount = Double.parseDouble(options[1]);
				        	if (isCommand) messageSend("withdraw " + amount);
				        	else setBalance(amount * (-1));
				        	break;
				        case "addinterest":
				        	double percent = Double.parseDouble(options[1]);
				        	if (isCommand) messageSend("addinterest " + percent);
				        	else commandAddinterest(percent);
				        	break;
				        case "sleep":
				        	int duration = Integer.parseInt(options[1]);
				        	commandSleep(duration);
				        	break;
					}
		        	break;
	        }
			
		} catch (NumberFormatException e) {
			__error("userCommand", "NumberFormatException", e);
		} catch (ArrayIndexOutOfBoundsException e) {	// split command
			__error("userCommand", "ArrayIndexOutOfBoundsException", e);
		}
	}
	
	/**
	 * Execute user command "balance".
	 */
	private void commandBalance() {
		System.out.println("balance=" + balance);
	}
	
	/**
	 * Set a balance by adding amount to the current value.
	 * 
	 * @param amount Adding value
	 */
	private void setBalance(double amount) {
		balance += amount;
		System.out.println("New balance=" + balance);
	}	
	
	private void commandAddinterest(double percent) {
		balance *= (1 + percent/100);
		System.out.println("New balance=" + balance);
	}
	
	/**
	 * 
	 * @param duration Duration in seconds
	 */
	private void commandSleep(int duration) {
		try {
			Thread.sleep(duration * 1000);
		} catch (InterruptedException e) {
			__error("commandSleep", "InterruptedException", e);
		}
	}
	
	/**
	 * Execution of user command "exit"
	 */
	private void commandExit() {
		try {
			isRun = false;
			
			// Leave group
			spreadGroup.leave();
			__info("Leave group:" + spreadGroup);
			
			// Disconnect
			spreadConnection.remove(this);
			spreadConnection.disconnect();
			__info("Disconnect connection");
			
			// Quiz
			System.exit(0);
		} catch (SpreadException e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * Send a SpreadMessage (safe, multicast)
	 * 
	 * @param content Content of the message
	 */
	private void messageSend(String content) {
		SpreadMessage message = new SpreadMessage();
		message.setSafe();
		message.addGroup(spreadGroup);
		message.setData(new String(content).getBytes());
		try {
			spreadConnection.multicast(message);
			__info("Sent safe message");
		} catch (SpreadException e) {
			__error("messageSend", "SpreadException", e);
		}
	}
	
	@Override
	public void messageReceived(SpreadMessage message)
	{
		if (message.isRegular()) {
			messageReceivedRegular(message);
		}
		else if (message.isMembership()) {
			messageReceivedMembership(message);
		}
		else if (message.isReject()) {
			__info("Receive a rejected message.");
		}
		else {
			__info("Receive a unkown message.");
		}		
	}	
	
	/**
	 * This function is called, when a regular message was received.
	 * 
	 * @param message Received message
	 */
	private void messageReceivedRegular(SpreadMessage message) {
		
		if(message.isSafe()) {
			__info("Receive a regular save message.");
		
			byte data[] = message.getData();
			String content = new String(data);
			
			System.out.println("\t data: " + data.length + " bytes, sender: " + message.getSender() + ", type: " + message.getType());
			System.out.println("\tcontent: " + content);
			
			command(content, false);
		}
		else {
			__info("Receive another regular message");
		}
	}
	
	/**
	 * This function is called, when a membership message was received.
	 * 
	 * @param message Received message
	 */
	private void messageReceivedMembership(SpreadMessage message) {
		if(message.isMembership()) {
			
			MembershipInfo info = message.getMembershipInfo();
			SpreadGroup group = info.getGroup();
			
			// ---------------- regular membership ----------------------------
			if (info.isRegularMembership()) {
				SpreadGroup members[] = info.getMembers();
				//GroupID groupID = info.getGroupID();
				
				__info("Receive a membership message for group " + group + " with " + members.length + " members:");
				for(SpreadGroup member : members) {
					System.out.println("\t\t" + member);	
				}
				
				if(info.isCausedByJoin()) {
					System.out.println("\tJOIN of " + info.getJoined());

//					System.out.println("joind=" + joined.toString() + ", myName:" + privateName);	
					CharSequence cs = privateName;
					if (!info.getJoined().toString().contains(cs)) {
						messageSend("balance " + balance);
					}
				}	else if(info.isCausedByLeave()) {
					System.out.println("\tLEAVE of " + info.getLeft());
				}	else if(info.isCausedByDisconnect()) {
					System.out.println("\tDISCONNECT of " + info.getDisconnected());
				} else if(info.isCausedByNetwork()) {
					System.out.println("\tNETWORK change");
				}
				
				
				numGroupMembers = members.length;
//				System.out.println("number of current replicas " + numGroupMembers + ", number of wanted replicas " + numReplicas);
			}
			// ---------------- transition membership -------------------------
			else if(info.isTransition()) {
				__info("Receive a transition membership message for group " + group);
			}
			// ---------------- self-leave membership -------------------------
			else if(info.isSelfLeave()) {
				__info("Receive a self-leav membership message for group " + group);
			}

		}
	}
	
	/**
	 * Helper function. Ouputs message on the command line.
	 * 
	 * @param message
	 *            Message
	 */
	private void __info(String message) {
		System.out.println("[Info] " + message);
	}
	
	/**
	 * Helper function. Ouptuts errors on the command line.
	 * 
	 * @param tag
	 *            Tag name, e.g. function in which the error occurs
	 * @param type
	 *            Name of the error, e.g. exception name
	 * @param e
	 *            Exception
	 */
	private void __error(String tag, String type, Exception e) {
		System.out.println("[Error] in " + tag + ": " + type + " " + e.getMessage());
	}	
}

Run code

General creating of an account replica

java AccountReplica <server address> <account name> <number of replicas>

If you want for example two replicas, type in two different terminals:

java Main localhost peterAccount 2

See also