Skip to content
Snippets Groups Projects
ConnectionPort.java 35.01 KiB
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.time.LocalDateTime;
import java.time.temporal.ChronoField;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.PriorityQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
//useful class
//1.LimitedSet:a set with limited maximum size
//2.GossipElement: store information useful for future event retrieve
import java.util.concurrent.LinkedBlockingDeque;


class LimitedSet implements Iterable<String>{
    int maxLen;
    HashSet<String> set;

    public LimitedSet(int len) {
        set = new HashSet<String>();
        maxLen = len;
    }

    public int size() {
        return set.size();
    }

    public void clear() {
        set.clear();
    }

    @Override
    public String toString() {
        
        if (set.size() ==0) {
            return "\n";
        } else {
            String[] elements = set.toArray(new String[set.size()]);
            if (elements.length == 1) {
                return elements[0]+"\n";
            } else {
                StringBuilder s = new StringBuilder();
                s.append(elements[0]);
                for (int i = 1; i < elements.length; i++) {
                    s.append(" "+elements[i]);
                }
                s.append("\n");
                return s.toString();
            }
        }
    }

    public String[] generateSubset(int f) {
        //The returned array will be "safe" in that no references to itare maintained by this set. 
        //(In other words, this method mustallocate a new array even if this set is backed by an array).
        //The caller is thus free to modify the returned array. 
        String[] elements =  set.toArray(new String[set.size()]);
        if (elements.length <= f) {
            return elements;
        } else {
            String[] output =  new String[f];
            boolean[] choosed = new boolean[elements.length];
            int chosen = 0;
            while (chosen < f) {
                int idx = (int)(Math.random()*elements.length);
                if (!choosed[idx]){
                    choosed[idx]=true;
                output[chosen++]=elements[idx];
                }
            }
            return output;
        }
    }

    public void removeAll(String[] es) {
        for (String e:es) {
            set.remove(e);
        }
    }

    public void remove(String es) {
        set.remove(es);
    }

    public boolean contains(String e) {
        return set.contains(e);
    }

    //output removed elements
    public String[] respectLimit() {
        int cursize = set.size();
        if (cursize <= maxLen) return null;
        int toremove = cursize - maxLen;
        String[] output =  new String[toremove];
        String[] elements =  set.toArray(new String[set.size()]);
        boolean[] dropped = new boolean[cursize];
        int noHasDropped = 0;
        while (noHasDropped < toremove) {
            int idx = (int)(Math.random() * elements.length);
            if (!dropped[idx]) {
                noHasDropped ++;
                dropped[idx] = true;
            }
        }
        HashSet<String> newset = new HashSet<>();
        int addtooutput = 0;
        for (int i = 0; i < cursize; i ++) {
            if (!dropped[i]) {
                newset.add(elements[i]);
            } else {
                 output[addtooutput++] = elements[i];
             }
         }
         set = newset;
         return output;
    }
 
    //if not already exists, return true
   public boolean add(String e) {
        return set.add(e);
   }

    public Iterator<String> iterator() {
        return set.iterator();
    }

}

class GossipElement {
    String wantedId;
    int round;
    String gossipSender;
    public GossipElement(String wantedId,int round,String gossipSender) {
        this.wantedId = wantedId;
        this.round = round;
        this.gossipSender = gossipSender;
    }
}


//a class implementing multicast protocol-gossip:lpbcast
//reference: https://dl.acm.org/doi/pdf/10.1145/945506.945507 ''Lightweight Probabilistic Broadcast''
public class ConnectionPort {
    private final int gossipinterval = 500;
    private final int firstpullround = 4;//after this rounds, start to pull tx from source node
    private final int pullinterval = firstpullround * gossipinterval;
    //every time gossip to how many neighbors.
    //from paper,figure shows that increasing the fanout decreases the number of rounds necessary to infect all processes
    //choose optimal fanout:Probabilistic reliable dissemination in large-scale systems
    private final int f;
    private final int lastpullround = 8;//after this rounds ,start to pull tx
    private LimitedSet unSubs;
    private LimitedSet subs;//new subscription
    private LimitedSet partialView;//(ip,port),maintain a memberlist
    private LimitedSet events;//events to be gossiped in the next round(time,id,from,to,amount)
    private HashMap<String,String> eventId;//digest(history of all events notifications)(id,events)
    private final int maxBufferSize;//maximum eventId size
    private PriorityQueue<String> orderedeid;//order the event from older to newer so that can remove digest 
    private ConcurrentHashMap<String,GossipElement> retrieveBuf;//record event that has been received by other nodes but not me for future pull
    private ConcurrentHashMap<String,Integer> transactionsPool;//once added to this pool, means delivered
    
    private PrintStream bandwidthLog;
    private PrintStream delayLog;

    private String myName;
    private int myPort;
    private String myAddr;
    private Socket toserver;//for future block solution,verfification send
    private PrintStream psServer; //for future block solution,verfification send
    private BufferedReader brServer; 
    private int currentRound;
   
    
    //field for block chain
    private ConcurrentHashMap<Integer,Block> blockBuffer;//for blockchain, respond block request
    private ConcurrentHashMap<Integer,String> mergeInfoBuffer;//for blockchain, respond mergeInfo request
   
    private ConcurrentLinkedQueue<Block> blockChain;//for blockchain.only to read, no write. might be useless because we introduce blockBuffer
    private ConcurrentHashMap<String,Integer> account;//for blockchain. only to read, no write.might be useless because we introduce mergeInfoBuffer
    private BlockingQueue<String> solutions;
    private BlockingQueue<String> verification;
    private BlockingQueue<Block> blockfromothernodes;

    //function to get current time in microsecond
    private long getMicroTime() {
        return System.currentTimeMillis()/1000*1000000 + LocalDateTime.now().getLong(ChronoField.MICRO_OF_SECOND);
    }

    //function to transfer time in second to microsecond
    private long getMicroTime(String time) {
        double t = Double.parseDouble(time);
        return (long) (t*1000000);
    }


    private void sendMsg(String target, String msg) 
    throws IOException
    {
        String[] infos = target.split(",");
        Socket s = new Socket(infos[0],Integer.parseInt(infos[1]));
        PrintStream ps = new PrintStream(s.getOutputStream());
        ps.print(msg);//not println, because msg has \n itself
        //log bandwidth
        bandwidthLog.println(getMicroTime()+","+msg.length());
        ps.close();
        s.close();
    }

    //function to pull message of a specific type from target, if request fail, return null
    //type1:RETR,transaction
    //type2:REQB,block
    //type3:REQM,mergeinfo
    private String retrieveTx(String target, String wantedId, String type) 
    {
        try {
        String[] targetInfo = target.split(",");
        Socket s = new Socket(targetInfo[0],Integer.parseInt(targetInfo[1]));
        PrintStream ps = new PrintStream(s.getOutputStream());
        BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream()));
        String msg = type + "\n" +wantedId;
        ps.println(msg);
        bandwidthLog.println(getMicroTime()+","+msg.length());
        String respond = br.readLine();
        if (respond == null) {
            throw new IOException();
        }
        if (respond.equals("OK")) {
            String tx = br.readLine();
            if (tx==null) {
                throw new IOException();
            }
            /*
            try {
            ps.close();
            br.close();
            s.close();
            }catch (IOException es) {
                es.printStackTrace();
            }*/
            return tx;
        } else {
            /*will close them automatically?
            try {
            ps.close();
            br.close();
            s.close();}
            catch (IOException et) {
                et.printStackTrace();
            }*/
            return null;
        }
        }catch (IOException e) {
                  //target is dead
                  System.out.println(target+" has dead");
                  synchronized(partialView) {
                  partialView.remove(target);
                  subs.remove(target);
                  unSubs.add(target);
                  }
                  return null;
        }
    }





    //about blockchain
    public ConcurrentHashMap<Integer, Block> getBlockBuffer() {
        return blockBuffer;
    }

    public ConcurrentHashMap<Integer, String> getMergeInfoBuffer() {
        return mergeInfoBuffer;
    }

    public ConcurrentHashMap<String,Integer> getMemPool() {
        return transactionsPool;
    }
    //might be removed in the future
    public ConcurrentHashMap<String,Integer> getAccount() {
        return account;
    }
    //might be removec in the future
    public ConcurrentLinkedQueue<Block> getBlockChain() {
        return blockChain;
    }

    
    //TODO
    public void sendPuzzle(String puzzle) {
        psServer.println(puzzle);
    }

    //TODO
    public String getSolution() {

        try {
        String solution = solutions.take();
        return solution;
        }catch(InterruptedException e) {
            e.printStackTrace();
            return null;
        }
    }

    //TODO
    public void sendVerify(String verify) {
        psServer.println(verify);
    }

    //TODO
    public String getVerified() {

        try {
        String verified = verification.take();
        return verified;
        }catch(InterruptedException e) {
            e.printStackTrace();
            return null;
        }
    }

    //TODO
    public void sendBlock(Block block) {
        String[] targets = new String[f];
        String msg = "BLOCK\n"+block.toString()+"\n";
        
        int sent = 0;
        while (sent < 1) {//at least sent out one
            synchronized(partialView) {
                targets = partialView.generateSubset(f);
                }
        for (String target:targets) {
            //TODO: add try. added
            try {
            sendMsg(target,msg );
            sent ++;
            }catch(IOException e) {
                //target is dead
                System.out.println(target+" has dead");
                synchronized(partialView) {
                    partialView.remove(target);
                    subs.remove(target);
                    unSubs.add(target);
                }
            }
        }
        }
    }

    //TODO
    public Block getBlock() {
        try {
            Block b = blockfromothernodes.take();
            return b;
        }catch(InterruptedException e) {
            e.printStackTrace();
            return null;
        }
    }

    //TODO if get a target block from addr, return the block, otherwise, return null
    public Block requestBlock(String addr, int height) {
        String blockinfo = retrieveTx(addr, String.valueOf(height), "REQB");
        if (blockinfo != null) {
            String[] blockpart = blockinfo.split(";");
            String[] txlist = blockpart[2].split(" ");
            return new Block(Integer.parseInt(blockpart[0]),blockpart[1],txlist,blockpart[3],blockpart[4],blockpart[5],blockpart[6]);
        } else {
            return null;
        }
    }

    //TODO
    public String requestMergeInfo(String addr, int height) {
        String mergeinfo = retrieveTx(addr, String.valueOf(height), "REQM");
        return mergeinfo;
    }
    

    class EventComparator implements Comparator<String> {
        public int compare(String a,String b) {
            double atime = Double.parseDouble(a.split(",")[0]);
            double btime = Double.parseDouble(b.split(",")[0]);
            return (atime > btime)? 1:((atime == btime)? 0:-1);
        }
    }
    
    //thread to broadcast tx and membership update periodically
    //every gossip message—besides notifying events—also piggybacks a set 
    //of process identifiers which are used to update the views.
    //detail:
    //a gossipmessage serves four purposes
    //1.Event Notification:piggybacks event notifications received (forthe first time) since the last outgoing gossip message
    //2.Event Notification Identifiers:digest(inventory events)
    //3.Subscription
    //4.unSubscription
    //TODO:what if partialView is empty? Don't worry, this gossip round just doesn't gossip anything
    class PeriodicalGossip implements Runnable {
        public void run() {
            while (true) {
                try {
                Thread.sleep(gossipinterval);
                } catch(InterruptedException e) {
                    e.printStackTrace();
                }
                currentRound ++;
                String[] targets = new String[f];
                StringBuilder gossipMsg = new StringBuilder();
                gossipMsg.append("GOSP\n");
                synchronized(partialView) {
                gossipMsg.append(unSubs.toString());
                String subinfo = subs.toString();
                if (subinfo.length() == 1) {
                    gossipMsg.append(myAddr+","+myPort+"\n");
                } else {
                    gossipMsg.append(myAddr+","+myPort+" "+subinfo);
                }
                targets = partialView.generateSubset(f);
                }
                synchronized(eventId) {
                    gossipMsg.append(events.toString());
                    events.clear();
                    if (eventId.size() == 0) {
                        gossipMsg.append("\n");
                    } else {
                        String[] digest = eventId.keySet().toArray(new String[eventId.size()]);
                        if (digest.length == 1) {
                            gossipMsg.append(digest[0]+"\n");
                        } else {
                            gossipMsg.append(digest[0]);
                            for (int j = 1; j < digest.length; j ++) {
                                gossipMsg.append(" "+digest[j]);
                            }
                            gossipMsg.append("\n");
                        }
                    }
                }
                String msg = gossipMsg.toString();
                for (String target:targets) {
                    //TODO: add try. added
                    try {
                    sendMsg(target, msg);
                    }catch(IOException e) {
                        //target is dead
                        System.out.println(target+" has dead");
                        synchronized(partialView) {
                            partialView.remove(target);
                            subs.remove(target);
                            unSubs.add(target);
                        }
                    }
                }
            }
        }

    }

    //thread periodically pull missing transactions from other processes
    class PullEvent implements Runnable {
        public void run() {
            while (true) {
                try {
                Thread.sleep(pullinterval);
                } catch(InterruptedException e) {
                    e.printStackTrace();
                }
                String[] targets = new String[f];
                for (GossipElement tx:retrieveBuf.values()) {
                    if (currentRound -tx.round> lastpullround) {
                        synchronized(partialView) {
                            targets = partialView.generateSubset(f);
                        }
                        for (String target:targets) {
                            //TODO:add try. done
                                String respond = "";
                            if ((respond=retrieveTx(target,tx.wantedId,"RETR"))!=null){
                                String[] txpart = respond.split(",");
                                synchronized(eventId) {
                                    if (eventId.put(txpart[1],respond) == null) {//receive this event for the first time
                                        //will gossip it in the next gossip round
                                        events.add(respond);
                                        //deliver it
                                        //log delay
                                        delayLog.println(getMicroTime()+","+getMicroTime(txpart[0]));
                                        transactionsPool.put(respond,1);
                                        //add to orderedeventId
                                        orderedeid.add(txpart[0]+","+txpart[1]);
                                        //if it's in retrieveBuf, remove it
                                        retrieveBuf.remove(txpart[1]);
                                   } 
                                }
                                break;
                            }
                             
                        }
                    } else {
                        if (currentRound-tx.round > firstpullround) {
                            String res = "";
                            if ((res=retrieveTx(tx.gossipSender,tx.wantedId,"RETR"))!=null){
                                String[] txpart = res.split(",");
                                synchronized(eventId) {
                                    if (eventId.put(txpart[1],res) == null) {//receive this event for the first time
                                        //will gossip it in the next gossip round
                                        events.add(res);
                                        //deliver it
                                        //log delay
                                        delayLog.println(getMicroTime()+","+getMicroTime(txpart[0]));
                                        transactionsPool.put(res,1);
                                        //add to orderedeventId
                                        orderedeid.add(txpart[0]+","+txpart[1]);
                                        //if it's in retrieveBuf, remove it
                                        retrieveBuf.remove(txpart[1]);
                                   } 
                                }
                                
                            }
                        }
                    }
                }
            }
        }
    }
    

    class DealWithGossip implements Runnable {//handle message from other nodes
        Socket newaccept;
        public DealWithGossip(Socket s) {
            newaccept = s;
        }
        public void run() {
            try {
            BufferedReader br = new BufferedReader(new InputStreamReader(newaccept.getInputStream()));
            String type = br.readLine();
            
            if (type!= null && type.equals("GOSP")) {
            String unsubscription = br.readLine();
            String subscription = br.readLine();
            String transactions = br.readLine();
            String digest = br.readLine();
            String sender = "";
            //handle unsubscription
            synchronized(partialView) {
            if (unsubscription != null && !unsubscription.equals("")) {
                String[] unsubsInfo = unsubscription.split(" ");
                partialView.removeAll(unsubsInfo);
                subs.removeAll(unsubsInfo);
                for (String unsub:unsubsInfo) {
                    unSubs.add(unsub);
                }
                unSubs.respectLimit();  
            }
            //handle subscription
            if (subscription != null && !subscription.equals("")) {
                String[] subsInfos = subscription.split(" ");
                sender = subsInfos[0];
                String myinfo = myAddr+","+myPort;
                for (String sub:subsInfos) {
                    if (!sub.equals(myinfo)) {
                        if (partialView.add(sub)) {
                            subs.add(sub);
                        }
                    }
                }
                //if the node is dead, it won't restate its existence in sub, will be gradually removed from network
                //because of randomly removing members out of partialView and subs.
                String[] removed = partialView.respectLimit();
                if (removed != null) {
                    for (String remove:removed) {
                        subs.add(remove);
                    }
                }
                subs.respectLimit();
            }
            }
            //handle increment events
            synchronized(eventId) {
            if (transactions != null && !transactions.equals("")) {
                String[] transInfos = transactions.split(" ");
                for (String tx:transInfos) {
                    //transinfo format(time,id,from,to,amount)
                    String[] txpart = tx.split(",");
                    if (eventId.put(txpart[1],tx) == null) {//receive this event for the first time
                         //will gossip it in the next gossip round
                         events.add(tx);
                         //deliver it
                         //delay log
                         delayLog.println(getMicroTime()+","+getMicroTime(txpart[0]));
                         transactionsPool.put(tx,1);
                         //add to orderedeventId
                         orderedeid.add(txpart[0]+","+txpart[1]);
                         //if it's in retrieveBuf, remove it
                         retrieveBuf.remove(txpart[1]);
                    } 
                }
            }
            //handle inventory events
            //those that has not been pushed to me, need to pull initiatively
            if (digest != null && !digest.equals("")) {
                String[] transIds = digest.split(" ");
                for (String transId:transIds) {
                    if (!eventId.containsKey(transId)) {//don't know why neighbor doesn't push this to me,need to pull
                        retrieveBuf.putIfAbsent(transId,new GossipElement(transId,currentRound,sender));
                    }
                }
            }
            //eventid respect maxlen limit(removed oldest events)
            int toremove = eventId.size() - maxBufferSize;
            if (toremove > 0) {
                int removed = 0;
                while (removed < toremove) {
                    String id = orderedeid.poll().split(",")[1];
                    eventId.remove(id);
                    removed ++;
                }
            }
            //events respect maxlen limit
            events.respectLimit();}
        }else {
            //TODO:other types of message
            if (type!=null && type.equals("RETR")) {
                String requestedId = br.readLine();
                if (requestedId != null) {
                PrintStream ps = new PrintStream(newaccept.getOutputStream());
                synchronized(eventId) {
                String tx = eventId.get(requestedId);
                
                if (tx!=null) 
                {
                    String reply = "OK\n"+tx;
                    ps.println(reply);
                    bandwidthLog.println(getMicroTime()+","+reply.length());
                } else {
                    ps.println("FAIL");
                    bandwidthLog.println(getMicroTime()+",4");
                }
                }}
            } else {
                //deal with new subscription
                if (type != null && type.equals("JOIN")) {
                    String newmemberInfo = br.readLine();
                    if (newmemberInfo != null) {
                        PrintStream ps = new PrintStream(newaccept.getOutputStream());
                        synchronized(partialView) {
                            if (partialView.size() == 0) {
                                String torespond = myAddr + "," + myPort;
                                ps.println(torespond);
                                bandwidthLog.println(getMicroTime()+","+torespond.length());
                            } else {
                                String myrespond = myAddr+","+myPort+" "+partialView.toString();
                                ps.print(myrespond);
                                bandwidthLog.println(getMicroTime()+","+myrespond.length());
                            }
                        partialView.add(newmemberInfo);
                        subs.add(newmemberInfo);
                        }
                    }    
                } else {
                    if (type != null && type.equals("REQB")) {
                        String wantblock = br.readLine();
                        if (wantblock != null) {
                        Block b = blockBuffer.get(Integer.parseInt(wantblock));
                        PrintStream ps = new PrintStream(newaccept.getOutputStream());
                        if (b!=null) {
                            
                            String rep = "OK\n" + b.toString();
                            ps.println(rep);
                            bandwidthLog.println(getMicroTime()+","+rep.length());
                        } else {
                            ps.println("FAIL");
                            bandwidthLog.println(getMicroTime()+",4");
                        }
                        }
                    } else {
                        if (type != null && type.equals("REQM")) {
                            String wantmerge = br.readLine();
                            if (wantmerge!=null) {
                            String mergeinfo = mergeInfoBuffer.get(Integer.parseInt(wantmerge));
                            PrintStream ps = new PrintStream(newaccept.getOutputStream());
                            if (mergeinfo!=null) {
                                String repmerge = "OK\n"+mergeinfo;
                                ps.println(repmerge);
                                bandwidthLog.println(getMicroTime()+","+repmerge.length());
                            } else {
                                ps.println("FAIL");
                                bandwidthLog.println(getMicroTime()+",4");
                            }
                            }
                        } else {
                            if (type != null && type.equals("BLOCK")) {
                                String b = br.readLine();
                                if (b != null) {
                                    String[] blockpart = b.split(";");
                                    String[] txlist = blockpart[2].split(" ");
                                    Block newblock = new Block(Integer.parseInt(blockpart[0]),blockpart[1],txlist,blockpart[3],blockpart[4],blockpart[5],blockpart[6]);
                                    blockfromothernodes.add(newblock);
                                }
                            }
                        }
                    }
                } 
            }
        }
            newaccept.close();
            br.close();
        }catch(IOException e) {
            e.printStackTrace();
        }
        }
    }
    //The only way to really find out whether a remote side socket close is by reading 
    //(you'll get -1 as return value) or writing (an IOException (broken pipe) will be thrown)
    // on the associated Input/OutputStreams.
    class ListenForConnection implements Runnable {//listen connecton from other nodes
        ServerSocket asServer;
        public ListenForConnection(ServerSocket s) {
            asServer = s;
        }
        public void run() {
            try {
                while (true) {
                    Socket newaccept = asServer.accept();
                    new Thread(new DealWithGossip(newaccept)).start();
                }
            } catch(IOException e) { 
                e.printStackTrace();
   	            System.out.println("can't listen!");
                System.exit(1);
            }
        }
    }

    //send subscription to one process that has joined the network
    private void subscription(String addr, String port) 
    throws IOException
    {
        
        Socket s = new Socket(addr, Integer.parseInt(port));
        PrintStream ps = new PrintStream(s.getOutputStream());
        BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream()));
        String subscriptioninfo = "JOIN\n"+myAddr+","+myPort;
        ps.println(subscriptioninfo);
        bandwidthLog.println(getMicroTime()+","+subscriptioninfo.length());
        String members = br.readLine();
        if (members != null) {
        //receive partialview from other nodes
        //format<ip,port ip,port ip,port>
        //update my partialView 
        synchronized(partialView) {
            String[] memberslist = members.split(" ");
            for (String member:memberslist) {
                partialView.add(member);
            }
        }
        } else {
            try {
                ps.close();
                br.close();
                s.close();
            }catch(IOException e) {
                e.printStackTrace();
            }
            throw new IOException();
        }
        try {
            ps.close();
            br.close();
            s.close();
        }catch(IOException e) {
            e.printStackTrace();
        }
        
    }

    class HandleConnectionWithServer implements Runnable {//node to server
        public void run() {
            //introduce myself to the service. Now, service knows the existence of me. It will introduce me to others.
            //If there are other nodes existing in the network, It will introduce up to 3 to me as well
            try {
            String connectCommand = "CONNECT " + myName + " " + myAddr + " " + myPort;
            System.out.println(connectCommand);
            psServer.println(connectCommand);
            bandwidthLog.println(getMicroTime()+","+connectCommand.length());
            String messageFromServer="";
            while ((messageFromServer=brServer.readLine())!=null) {
                String[] infos = messageFromServer.split(" ");
                switch(infos[0]) {
                    case "INTRODUCE":
                        try {
                        subscription(infos[2],infos[3]);
                        }catch(IOException e) {
                            System.out.println("subscribe to "+infos[2]+","+infos[3]+" failed");
                        }
                        break;
                    case "TRANSACTION"://upon LPBCAST(transaction)
                        String transaction = infos[1]+","+infos[2]+","+infos[3]+","+infos[4]+","+infos[5];
                        synchronized(eventId) {
                        //deliver it
                        //log delay
                        delayLog.println(getMicroTime()+","+getMicroTime(infos[1]));
                        transactionsPool.put(transaction,1);
                        //record it
                        eventId.put(infos[2],transaction);
                        //will gossip it in the next gossip round
                        events.add(transaction);
                        //add to orderedeventId
                        orderedeid.add(infos[1]+","+infos[2]);
                        }
                        break;
                    case "DIE":
                        System.exit(1);
                        //never reach break
                        break;
                    case "QUIT":
                        System.exit(1);
                        //never reach break
                        break;
                    case "SOLVED":
                        solutions.add(messageFromServer);
                        break;
                    case "VERIFY":
                        verification.add(messageFromServer);
                        break;
                }

            }
            } catch (IOException e) {
                System.out.println("can't connect to service!");
                System.exit(1);
            }
        }

    }

    public ConnectionPort(String serverAddress, int serverPort, String myName, int myPort, int initialNode)
    throws IOException
    {
        this.myName = myName;
        this.myPort = myPort;
        this.myAddr = InetAddress.getLocalHost().getHostAddress();
        f = (int)Math.ceil(2*(Math.log(0.5*initialNode) + 2.4));
        currentRound = 0;
        partialView = new LimitedSet(f + 5);
        unSubs = new LimitedSet(10);
        subs = new LimitedSet(10);
        transactionsPool = new ConcurrentHashMap<>();
        retrieveBuf = new ConcurrentHashMap<>();
        orderedeid = new PriorityQueue<String>(new EventComparator());
        events = new LimitedSet(20);
        eventId = new HashMap<>();
        maxBufferSize = 10000;
        blockBuffer = new ConcurrentHashMap<>();//for blockchain, respond block request
        mergeInfoBuffer = new ConcurrentHashMap<>();//for blockchain, respond mergeInfo request
        blockChain = new ConcurrentLinkedQueue<>();//for blockchain.only to read, no write. might be useless because we introduce blockBuffer
        account = new ConcurrentHashMap<>();//for blockchain. only to read, no write.might be useless because we introduce mergeInfoBuffer
        solutions = new LinkedBlockingDeque<>();
        verification = new LinkedBlockingDeque<>();
        blockfromothernodes = new LinkedBlockingDeque<>();
        File bandwidth = new File("bandwidth_"+myName+".csv");
        bandwidth.createNewFile();
        bandwidthLog = new PrintStream(new FileOutputStream(bandwidth));
        File delay = new File("delay_" + myName + ".csv");
        delay.createNewFile();
        delayLog = new PrintStream(new FileOutputStream(delay));
        //start to listen to connection from other nodes
        //join the membership
        ServerSocket serv = new ServerSocket(myPort);
        new Thread(new ListenForConnection(serv)).start();
        toserver = new Socket(serverAddress,serverPort);
        psServer = new PrintStream(toserver.getOutputStream());
        brServer = new BufferedReader(new InputStreamReader(toserver.getInputStream()));
        //connect to service
        new Thread(new HandleConnectionWithServer()).start();
        new Thread(new PeriodicalGossip()).start();
        new Thread(new PullEvent()).start();
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 5) {
            System.out.println("usage: ConnectionPort serveraddr serverport myname myport initialnode");
            return;
        }
        ConnectionPort testtx = new ConnectionPort(args[0],Integer.parseInt(args[1]),args[2],Integer.parseInt(args[3]),Integer.parseInt(args[4]));
    }
    
}