ConnectionPort.java 35.01 KiB
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.time.LocalDateTime;
import java.time.temporal.ChronoField;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.PriorityQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
//useful class
//1.LimitedSet:a set with limited maximum size
//2.GossipElement: store information useful for future event retrieve
import java.util.concurrent.LinkedBlockingDeque;
class LimitedSet implements Iterable<String>{
int maxLen;
HashSet<String> set;
public LimitedSet(int len) {
set = new HashSet<String>();
maxLen = len;
}
public int size() {
return set.size();
}
public void clear() {
set.clear();
}
@Override
public String toString() {
if (set.size() ==0) {
return "\n";
} else {
String[] elements = set.toArray(new String[set.size()]);
if (elements.length == 1) {
return elements[0]+"\n";
} else {
StringBuilder s = new StringBuilder();
s.append(elements[0]);
for (int i = 1; i < elements.length; i++) {
s.append(" "+elements[i]);
}
s.append("\n");
return s.toString();
}
}
}
public String[] generateSubset(int f) {
//The returned array will be "safe" in that no references to itare maintained by this set.
//(In other words, this method mustallocate a new array even if this set is backed by an array).
//The caller is thus free to modify the returned array.
String[] elements = set.toArray(new String[set.size()]);
if (elements.length <= f) {
return elements;
} else {
String[] output = new String[f];
boolean[] choosed = new boolean[elements.length];
int chosen = 0;
while (chosen < f) {
int idx = (int)(Math.random()*elements.length);
if (!choosed[idx]){
choosed[idx]=true;
output[chosen++]=elements[idx];
}
}
return output;
}
}
public void removeAll(String[] es) {
for (String e:es) {
set.remove(e);
}
}
public void remove(String es) {
set.remove(es);
}
public boolean contains(String e) {
return set.contains(e);
}
//output removed elements
public String[] respectLimit() {
int cursize = set.size();
if (cursize <= maxLen) return null;
int toremove = cursize - maxLen;
String[] output = new String[toremove];
String[] elements = set.toArray(new String[set.size()]);
boolean[] dropped = new boolean[cursize];
int noHasDropped = 0;
while (noHasDropped < toremove) {
int idx = (int)(Math.random() * elements.length);
if (!dropped[idx]) {
noHasDropped ++;
dropped[idx] = true;
}
}
HashSet<String> newset = new HashSet<>();
int addtooutput = 0;
for (int i = 0; i < cursize; i ++) {
if (!dropped[i]) {
newset.add(elements[i]);
} else {
output[addtooutput++] = elements[i];
}
}
set = newset;
return output;
}
//if not already exists, return true
public boolean add(String e) {
return set.add(e);
}
public Iterator<String> iterator() {
return set.iterator();
}
}
class GossipElement {
String wantedId;
int round;
String gossipSender;
public GossipElement(String wantedId,int round,String gossipSender) {
this.wantedId = wantedId;
this.round = round;
this.gossipSender = gossipSender;
}
}
//a class implementing multicast protocol-gossip:lpbcast
//reference: https://dl.acm.org/doi/pdf/10.1145/945506.945507 ''Lightweight Probabilistic Broadcast''
public class ConnectionPort {
private final int gossipinterval = 500;
private final int firstpullround = 4;//after this rounds, start to pull tx from source node
private final int pullinterval = firstpullround * gossipinterval;
//every time gossip to how many neighbors.
//from paper,figure shows that increasing the fanout decreases the number of rounds necessary to infect all processes
//choose optimal fanout:Probabilistic reliable dissemination in large-scale systems
private final int f;
private final int lastpullround = 8;//after this rounds ,start to pull tx
private LimitedSet unSubs;
private LimitedSet subs;//new subscription
private LimitedSet partialView;//(ip,port),maintain a memberlist
private LimitedSet events;//events to be gossiped in the next round(time,id,from,to,amount)
private HashMap<String,String> eventId;//digest(history of all events notifications)(id,events)
private final int maxBufferSize;//maximum eventId size
private PriorityQueue<String> orderedeid;//order the event from older to newer so that can remove digest
private ConcurrentHashMap<String,GossipElement> retrieveBuf;//record event that has been received by other nodes but not me for future pull
private ConcurrentHashMap<String,Integer> transactionsPool;//once added to this pool, means delivered
private PrintStream bandwidthLog;
private PrintStream delayLog;
private String myName;
private int myPort;
private String myAddr;
private Socket toserver;//for future block solution,verfification send
private PrintStream psServer; //for future block solution,verfification send
private BufferedReader brServer;
private int currentRound;
//field for block chain
private ConcurrentHashMap<Integer,Block> blockBuffer;//for blockchain, respond block request
private ConcurrentHashMap<Integer,String> mergeInfoBuffer;//for blockchain, respond mergeInfo request
private ConcurrentLinkedQueue<Block> blockChain;//for blockchain.only to read, no write. might be useless because we introduce blockBuffer
private ConcurrentHashMap<String,Integer> account;//for blockchain. only to read, no write.might be useless because we introduce mergeInfoBuffer
private BlockingQueue<String> solutions;
private BlockingQueue<String> verification;
private BlockingQueue<Block> blockfromothernodes;
//function to get current time in microsecond
private long getMicroTime() {
return System.currentTimeMillis()/1000*1000000 + LocalDateTime.now().getLong(ChronoField.MICRO_OF_SECOND);
}
//function to transfer time in second to microsecond
private long getMicroTime(String time) {
double t = Double.parseDouble(time);
return (long) (t*1000000);
}
private void sendMsg(String target, String msg)
throws IOException
{
String[] infos = target.split(",");
Socket s = new Socket(infos[0],Integer.parseInt(infos[1]));
PrintStream ps = new PrintStream(s.getOutputStream());
ps.print(msg);//not println, because msg has \n itself
//log bandwidth
bandwidthLog.println(getMicroTime()+","+msg.length());
ps.close();
s.close();
}
//function to pull message of a specific type from target, if request fail, return null
//type1:RETR,transaction
//type2:REQB,block
//type3:REQM,mergeinfo
private String retrieveTx(String target, String wantedId, String type)
{
try {
String[] targetInfo = target.split(",");
Socket s = new Socket(targetInfo[0],Integer.parseInt(targetInfo[1]));
PrintStream ps = new PrintStream(s.getOutputStream());
BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream()));
String msg = type + "\n" +wantedId;
ps.println(msg);
bandwidthLog.println(getMicroTime()+","+msg.length());
String respond = br.readLine();
if (respond == null) {
throw new IOException();
}
if (respond.equals("OK")) {
String tx = br.readLine();
if (tx==null) {
throw new IOException();
}
/*
try {
ps.close();
br.close();
s.close();
}catch (IOException es) {
es.printStackTrace();
}*/
return tx;
} else {
/*will close them automatically?
try {
ps.close();
br.close();
s.close();}
catch (IOException et) {
et.printStackTrace();
}*/
return null;
}
}catch (IOException e) {
//target is dead
System.out.println(target+" has dead");
synchronized(partialView) {
partialView.remove(target);
subs.remove(target);
unSubs.add(target);
}
return null;
}
}
//about blockchain
public ConcurrentHashMap<Integer, Block> getBlockBuffer() {
return blockBuffer;
}
public ConcurrentHashMap<Integer, String> getMergeInfoBuffer() {
return mergeInfoBuffer;
}
public ConcurrentHashMap<String,Integer> getMemPool() {
return transactionsPool;
}
//might be removed in the future
public ConcurrentHashMap<String,Integer> getAccount() {
return account;
}
//might be removec in the future
public ConcurrentLinkedQueue<Block> getBlockChain() {
return blockChain;
}
//TODO
public void sendPuzzle(String puzzle) {
psServer.println(puzzle);
}
//TODO
public String getSolution() {
try {
String solution = solutions.take();
return solution;
}catch(InterruptedException e) {
e.printStackTrace();
return null;
}
}
//TODO
public void sendVerify(String verify) {
psServer.println(verify);
}
//TODO
public String getVerified() {
try {
String verified = verification.take();
return verified;
}catch(InterruptedException e) {
e.printStackTrace();
return null;
}
}
//TODO
public void sendBlock(Block block) {
String[] targets = new String[f];
String msg = "BLOCK\n"+block.toString()+"\n";
int sent = 0;
while (sent < 1) {//at least sent out one
synchronized(partialView) {
targets = partialView.generateSubset(f);
}
for (String target:targets) {
//TODO: add try. added
try {
sendMsg(target,msg );
sent ++;
}catch(IOException e) {
//target is dead
System.out.println(target+" has dead");
synchronized(partialView) {
partialView.remove(target);
subs.remove(target);
unSubs.add(target);
}
}
}
}
}
//TODO
public Block getBlock() {
try {
Block b = blockfromothernodes.take();
return b;
}catch(InterruptedException e) {
e.printStackTrace();
return null;
}
}
//TODO if get a target block from addr, return the block, otherwise, return null
public Block requestBlock(String addr, int height) {
String blockinfo = retrieveTx(addr, String.valueOf(height), "REQB");
if (blockinfo != null) {
String[] blockpart = blockinfo.split(";");
String[] txlist = blockpart[2].split(" ");
return new Block(Integer.parseInt(blockpart[0]),blockpart[1],txlist,blockpart[3],blockpart[4],blockpart[5],blockpart[6]);
} else {
return null;
}
}
//TODO
public String requestMergeInfo(String addr, int height) {
String mergeinfo = retrieveTx(addr, String.valueOf(height), "REQM");
return mergeinfo;
}
class EventComparator implements Comparator<String> {
public int compare(String a,String b) {
double atime = Double.parseDouble(a.split(",")[0]);
double btime = Double.parseDouble(b.split(",")[0]);
return (atime > btime)? 1:((atime == btime)? 0:-1);
}
}
//thread to broadcast tx and membership update periodically
//every gossip message—besides notifying events—also piggybacks a set
//of process identifiers which are used to update the views.
//detail:
//a gossipmessage serves four purposes
//1.Event Notification:piggybacks event notifications received (forthe first time) since the last outgoing gossip message
//2.Event Notification Identifiers:digest(inventory events)
//3.Subscription
//4.unSubscription
//TODO:what if partialView is empty? Don't worry, this gossip round just doesn't gossip anything
class PeriodicalGossip implements Runnable {
public void run() {
while (true) {
try {
Thread.sleep(gossipinterval);
} catch(InterruptedException e) {
e.printStackTrace();
}
currentRound ++;
String[] targets = new String[f];
StringBuilder gossipMsg = new StringBuilder();
gossipMsg.append("GOSP\n");
synchronized(partialView) {
gossipMsg.append(unSubs.toString());
String subinfo = subs.toString();
if (subinfo.length() == 1) {
gossipMsg.append(myAddr+","+myPort+"\n");
} else {
gossipMsg.append(myAddr+","+myPort+" "+subinfo);
}
targets = partialView.generateSubset(f);
}
synchronized(eventId) {
gossipMsg.append(events.toString());
events.clear();
if (eventId.size() == 0) {
gossipMsg.append("\n");
} else {
String[] digest = eventId.keySet().toArray(new String[eventId.size()]);
if (digest.length == 1) {
gossipMsg.append(digest[0]+"\n");
} else {
gossipMsg.append(digest[0]);
for (int j = 1; j < digest.length; j ++) {
gossipMsg.append(" "+digest[j]);
}
gossipMsg.append("\n");
}
}
}
String msg = gossipMsg.toString();
for (String target:targets) {
//TODO: add try. added
try {
sendMsg(target, msg);
}catch(IOException e) {
//target is dead
System.out.println(target+" has dead");
synchronized(partialView) {
partialView.remove(target);
subs.remove(target);
unSubs.add(target);
}
}
}
}
}
}
//thread periodically pull missing transactions from other processes
class PullEvent implements Runnable {
public void run() {
while (true) {
try {
Thread.sleep(pullinterval);
} catch(InterruptedException e) {
e.printStackTrace();
}
String[] targets = new String[f];
for (GossipElement tx:retrieveBuf.values()) {
if (currentRound -tx.round> lastpullround) {
synchronized(partialView) {
targets = partialView.generateSubset(f);
}
for (String target:targets) {
//TODO:add try. done
String respond = "";
if ((respond=retrieveTx(target,tx.wantedId,"RETR"))!=null){
String[] txpart = respond.split(",");
synchronized(eventId) {
if (eventId.put(txpart[1],respond) == null) {//receive this event for the first time
//will gossip it in the next gossip round
events.add(respond);
//deliver it
//log delay
delayLog.println(getMicroTime()+","+getMicroTime(txpart[0]));
transactionsPool.put(respond,1);
//add to orderedeventId
orderedeid.add(txpart[0]+","+txpart[1]);
//if it's in retrieveBuf, remove it
retrieveBuf.remove(txpart[1]);
}
}
break;
}
}
} else {
if (currentRound-tx.round > firstpullround) {
String res = "";
if ((res=retrieveTx(tx.gossipSender,tx.wantedId,"RETR"))!=null){
String[] txpart = res.split(",");
synchronized(eventId) {
if (eventId.put(txpart[1],res) == null) {//receive this event for the first time
//will gossip it in the next gossip round
events.add(res);
//deliver it
//log delay
delayLog.println(getMicroTime()+","+getMicroTime(txpart[0]));
transactionsPool.put(res,1);
//add to orderedeventId
orderedeid.add(txpart[0]+","+txpart[1]);
//if it's in retrieveBuf, remove it
retrieveBuf.remove(txpart[1]);
}
}
}
}
}
}
}
}
}
class DealWithGossip implements Runnable {//handle message from other nodes
Socket newaccept;
public DealWithGossip(Socket s) {
newaccept = s;
}
public void run() {
try {
BufferedReader br = new BufferedReader(new InputStreamReader(newaccept.getInputStream()));
String type = br.readLine();
if (type!= null && type.equals("GOSP")) {
String unsubscription = br.readLine();
String subscription = br.readLine();
String transactions = br.readLine();
String digest = br.readLine();
String sender = "";
//handle unsubscription
synchronized(partialView) {
if (unsubscription != null && !unsubscription.equals("")) {
String[] unsubsInfo = unsubscription.split(" ");
partialView.removeAll(unsubsInfo);
subs.removeAll(unsubsInfo);
for (String unsub:unsubsInfo) {
unSubs.add(unsub);
}
unSubs.respectLimit();
}
//handle subscription
if (subscription != null && !subscription.equals("")) {
String[] subsInfos = subscription.split(" ");
sender = subsInfos[0];
String myinfo = myAddr+","+myPort;
for (String sub:subsInfos) {
if (!sub.equals(myinfo)) {
if (partialView.add(sub)) {
subs.add(sub);
}
}
}
//if the node is dead, it won't restate its existence in sub, will be gradually removed from network
//because of randomly removing members out of partialView and subs.
String[] removed = partialView.respectLimit();
if (removed != null) {
for (String remove:removed) {
subs.add(remove);
}
}
subs.respectLimit();
}
}
//handle increment events
synchronized(eventId) {
if (transactions != null && !transactions.equals("")) {
String[] transInfos = transactions.split(" ");
for (String tx:transInfos) {
//transinfo format(time,id,from,to,amount)
String[] txpart = tx.split(",");
if (eventId.put(txpart[1],tx) == null) {//receive this event for the first time
//will gossip it in the next gossip round
events.add(tx);
//deliver it
//delay log
delayLog.println(getMicroTime()+","+getMicroTime(txpart[0]));
transactionsPool.put(tx,1);
//add to orderedeventId
orderedeid.add(txpart[0]+","+txpart[1]);
//if it's in retrieveBuf, remove it
retrieveBuf.remove(txpart[1]);
}
}
}
//handle inventory events
//those that has not been pushed to me, need to pull initiatively
if (digest != null && !digest.equals("")) {
String[] transIds = digest.split(" ");
for (String transId:transIds) {
if (!eventId.containsKey(transId)) {//don't know why neighbor doesn't push this to me,need to pull
retrieveBuf.putIfAbsent(transId,new GossipElement(transId,currentRound,sender));
}
}
}
//eventid respect maxlen limit(removed oldest events)
int toremove = eventId.size() - maxBufferSize;
if (toremove > 0) {
int removed = 0;
while (removed < toremove) {
String id = orderedeid.poll().split(",")[1];
eventId.remove(id);
removed ++;
}
}
//events respect maxlen limit
events.respectLimit();}
}else {
//TODO:other types of message
if (type!=null && type.equals("RETR")) {
String requestedId = br.readLine();
if (requestedId != null) {
PrintStream ps = new PrintStream(newaccept.getOutputStream());
synchronized(eventId) {
String tx = eventId.get(requestedId);
if (tx!=null)
{
String reply = "OK\n"+tx;
ps.println(reply);
bandwidthLog.println(getMicroTime()+","+reply.length());
} else {
ps.println("FAIL");
bandwidthLog.println(getMicroTime()+",4");
}
}}
} else {
//deal with new subscription
if (type != null && type.equals("JOIN")) {
String newmemberInfo = br.readLine();
if (newmemberInfo != null) {
PrintStream ps = new PrintStream(newaccept.getOutputStream());
synchronized(partialView) {
if (partialView.size() == 0) {
String torespond = myAddr + "," + myPort;
ps.println(torespond);
bandwidthLog.println(getMicroTime()+","+torespond.length());
} else {
String myrespond = myAddr+","+myPort+" "+partialView.toString();
ps.print(myrespond);
bandwidthLog.println(getMicroTime()+","+myrespond.length());
}
partialView.add(newmemberInfo);
subs.add(newmemberInfo);
}
}
} else {
if (type != null && type.equals("REQB")) {
String wantblock = br.readLine();
if (wantblock != null) {
Block b = blockBuffer.get(Integer.parseInt(wantblock));
PrintStream ps = new PrintStream(newaccept.getOutputStream());
if (b!=null) {
String rep = "OK\n" + b.toString();
ps.println(rep);
bandwidthLog.println(getMicroTime()+","+rep.length());
} else {
ps.println("FAIL");
bandwidthLog.println(getMicroTime()+",4");
}
}
} else {
if (type != null && type.equals("REQM")) {
String wantmerge = br.readLine();
if (wantmerge!=null) {
String mergeinfo = mergeInfoBuffer.get(Integer.parseInt(wantmerge));
PrintStream ps = new PrintStream(newaccept.getOutputStream());
if (mergeinfo!=null) {
String repmerge = "OK\n"+mergeinfo;
ps.println(repmerge);
bandwidthLog.println(getMicroTime()+","+repmerge.length());
} else {
ps.println("FAIL");
bandwidthLog.println(getMicroTime()+",4");
}
}
} else {
if (type != null && type.equals("BLOCK")) {
String b = br.readLine();
if (b != null) {
String[] blockpart = b.split(";");
String[] txlist = blockpart[2].split(" ");
Block newblock = new Block(Integer.parseInt(blockpart[0]),blockpart[1],txlist,blockpart[3],blockpart[4],blockpart[5],blockpart[6]);
blockfromothernodes.add(newblock);
}
}
}
}
}
}
}
newaccept.close();
br.close();
}catch(IOException e) {
e.printStackTrace();
}
}
}
//The only way to really find out whether a remote side socket close is by reading
//(you'll get -1 as return value) or writing (an IOException (broken pipe) will be thrown)
// on the associated Input/OutputStreams.
class ListenForConnection implements Runnable {//listen connecton from other nodes
ServerSocket asServer;
public ListenForConnection(ServerSocket s) {
asServer = s;
}
public void run() {
try {
while (true) {
Socket newaccept = asServer.accept();
new Thread(new DealWithGossip(newaccept)).start();
}
} catch(IOException e) {
e.printStackTrace();
System.out.println("can't listen!");
System.exit(1);
}
}
}
//send subscription to one process that has joined the network
private void subscription(String addr, String port)
throws IOException
{
Socket s = new Socket(addr, Integer.parseInt(port));
PrintStream ps = new PrintStream(s.getOutputStream());
BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream()));
String subscriptioninfo = "JOIN\n"+myAddr+","+myPort;
ps.println(subscriptioninfo);
bandwidthLog.println(getMicroTime()+","+subscriptioninfo.length());
String members = br.readLine();
if (members != null) {
//receive partialview from other nodes
//format<ip,port ip,port ip,port>
//update my partialView
synchronized(partialView) {
String[] memberslist = members.split(" ");
for (String member:memberslist) {
partialView.add(member);
}
}
} else {
try {
ps.close();
br.close();
s.close();
}catch(IOException e) {
e.printStackTrace();
}
throw new IOException();
}
try {
ps.close();
br.close();
s.close();
}catch(IOException e) {
e.printStackTrace();
}
}
class HandleConnectionWithServer implements Runnable {//node to server
public void run() {
//introduce myself to the service. Now, service knows the existence of me. It will introduce me to others.
//If there are other nodes existing in the network, It will introduce up to 3 to me as well
try {
String connectCommand = "CONNECT " + myName + " " + myAddr + " " + myPort;
System.out.println(connectCommand);
psServer.println(connectCommand);
bandwidthLog.println(getMicroTime()+","+connectCommand.length());
String messageFromServer="";
while ((messageFromServer=brServer.readLine())!=null) {
String[] infos = messageFromServer.split(" ");
switch(infos[0]) {
case "INTRODUCE":
try {
subscription(infos[2],infos[3]);
}catch(IOException e) {
System.out.println("subscribe to "+infos[2]+","+infos[3]+" failed");
}
break;
case "TRANSACTION"://upon LPBCAST(transaction)
String transaction = infos[1]+","+infos[2]+","+infos[3]+","+infos[4]+","+infos[5];
synchronized(eventId) {
//deliver it
//log delay
delayLog.println(getMicroTime()+","+getMicroTime(infos[1]));
transactionsPool.put(transaction,1);
//record it
eventId.put(infos[2],transaction);
//will gossip it in the next gossip round
events.add(transaction);
//add to orderedeventId
orderedeid.add(infos[1]+","+infos[2]);
}
break;
case "DIE":
System.exit(1);
//never reach break
break;
case "QUIT":
System.exit(1);
//never reach break
break;
case "SOLVED":
solutions.add(messageFromServer);
break;
case "VERIFY":
verification.add(messageFromServer);
break;
}
}
} catch (IOException e) {
System.out.println("can't connect to service!");
System.exit(1);
}
}
}
public ConnectionPort(String serverAddress, int serverPort, String myName, int myPort, int initialNode)
throws IOException
{
this.myName = myName;
this.myPort = myPort;
this.myAddr = InetAddress.getLocalHost().getHostAddress();
f = (int)Math.ceil(2*(Math.log(0.5*initialNode) + 2.4));
currentRound = 0;
partialView = new LimitedSet(f + 5);
unSubs = new LimitedSet(10);
subs = new LimitedSet(10);
transactionsPool = new ConcurrentHashMap<>();
retrieveBuf = new ConcurrentHashMap<>();
orderedeid = new PriorityQueue<String>(new EventComparator());
events = new LimitedSet(20);
eventId = new HashMap<>();
maxBufferSize = 10000;
blockBuffer = new ConcurrentHashMap<>();//for blockchain, respond block request
mergeInfoBuffer = new ConcurrentHashMap<>();//for blockchain, respond mergeInfo request
blockChain = new ConcurrentLinkedQueue<>();//for blockchain.only to read, no write. might be useless because we introduce blockBuffer
account = new ConcurrentHashMap<>();//for blockchain. only to read, no write.might be useless because we introduce mergeInfoBuffer
solutions = new LinkedBlockingDeque<>();
verification = new LinkedBlockingDeque<>();
blockfromothernodes = new LinkedBlockingDeque<>();
File bandwidth = new File("bandwidth_"+myName+".csv");
bandwidth.createNewFile();
bandwidthLog = new PrintStream(new FileOutputStream(bandwidth));
File delay = new File("delay_" + myName + ".csv");
delay.createNewFile();
delayLog = new PrintStream(new FileOutputStream(delay));
//start to listen to connection from other nodes
//join the membership
ServerSocket serv = new ServerSocket(myPort);
new Thread(new ListenForConnection(serv)).start();
toserver = new Socket(serverAddress,serverPort);
psServer = new PrintStream(toserver.getOutputStream());
brServer = new BufferedReader(new InputStreamReader(toserver.getInputStream()));
//connect to service
new Thread(new HandleConnectionWithServer()).start();
new Thread(new PeriodicalGossip()).start();
new Thread(new PullEvent()).start();
}
public static void main(String[] args) throws Exception {
if (args.length != 5) {
System.out.println("usage: ConnectionPort serveraddr serverport myname myport initialnode");
return;
}
ConnectionPort testtx = new ConnectionPort(args[0],Integer.parseInt(args[1]),args[2],Integer.parseInt(args[3]),Integer.parseInt(args[4]));
}
}