Skip to content
Snippets Groups Projects
MP2.py 9.6 KiB
Newer Older
  • Learn to ignore specific revisions
  • shunhan2's avatar
    shunhan2 committed
    import socket
    import threading
    import time
    import thread
    import os
    import random
    import math
    import sys
    
    server_ip = "172.22.94.198"
    server_port = 4444
    
    if len(sys.argv) != 5:
        print("Usage: MP2.py <CONNECT> <node number> <ip> <port>")
        sys.exit(1)
    
    op = str(sys.argv[1])
    nodeNum = str(sys.argv[2])
    ip = str(sys.argv[3])
    port = str(sys.argv[4])
    
    service_bd = 0
    socket_bd = 0
    
    class Node:
        def __init__(self, host, nodeNum, ip, port):
            self.host = host
            self.nodeNum = nodeNum
            self.port = int(port)
            self.ip = ip
            self.connection = {} # conncetion[node1] = ['ip', 'port']
            self.transaction = {}
            self.lost = []
            self.query = " ".join(["QUERY", nodeNum, ip, port])+"\n"
    
        def gossip_trans(self, msg):
            # while True:
            #     time.sleep(1)
            count = 0
            neighbor_set = list(set(self.connection.keys()) - set(self.lost))
            if neighbor_set:
                loops = int(2 * math.log(len(neighbor_set), 2))
                # print ("I'm still sending gossips!!" + msg)
                while count <= loops:
                    intro_nums = min(len(neighbor_set), 2)
                    intros = random.sample(neighbor_set, intro_nums)
                    for i in intros:
                        if i not in self.lost:
                            self.socket_send(i, self.connection[i][0], self.connection[i][1], msg)
                            time.sleep(0.1)
                    count += 1
            return 0
    
        def socket_send(self, node, dest, port, msg): 
        	'''
        	 method for client socket
        	'''
            s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            
            try:
                s1.connect((dest, int(port)))
            except socket.error, e:
                print (node + " is down!")
                print ('Strange error:%s' %e)
                self.lost.append(node)
                s1.close()
                return -1
    
            try:
                s1.sendall(msg.encode())
            except socket.error, e:
                print (node + " is down!")
                print ('Strange error:%s' %e)
                self.lost.append(node)
                s1.close()
                return -1
    
            s1.close()
            return 0
    
        def socket_rcv(self):
            global socket_bd
            s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            s2.bind((self.host, self.port))
            s2.listen(1000)
            while True:
                conn, addr = s2.accept()
                while True:
                    rcv = conn.recv(1024)
                    if not rcv:
                        # print ("target addr:" + str(addr) + "is down!\n")
                        break
                    
                    socket_bd += 1
                    
                    rcv = rcv.split("\n")
    
                    for rcv_data in rcv:
    
                        rcv_data = rcv_data.split(" ")
    
                        if not rcv_data:
                            continue
    
                        if rcv_data[0] == "TRANSACTION":
                        	
                            if len(rcv_data) != 6:
                                print (rcv_data)
                                continue
    
                            if rcv_data[2] not in self.transaction:
    
                                self.transaction[rcv_data[2]] = " ".join(rcv_data)
                                mes = "From nodes: " + self.transaction[rcv_data[2]] + "-----" + str("%.6f" % time.time()) + "\n"
                                with open ('transaction_'+self.nodeNum+'.log', 'a') as f1:
                                    f1.write(mes)
                                threading.Thread(target = self.gossip_trans, args = (self.transaction[rcv_data[2]], )).start()
                            	
                        if rcv_data[0] == "QUERY":
                            print (rcv_data)
                            if rcv_data[1] not in self.connection and rcv_data[1] != self.nodeNum:
                                self.connection[rcv_data[1]] = [rcv_data[2], rcv_data[3]]
                                mes = "Join: " + str(rcv_data[1]) + " Total: " + str(self.connection) + "-----" + str("%.6f" % time.time()) + "\n"
                                with open('connection_'+self.nodeNum+'.log', 'a') as f2:
                                    f2.write(mes)
                            if rcv_data[1] not in self.lost:
                                threading.Thread(target = self.send_update, args = (rcv_data[1], rcv_data[2], rcv_data[3])).start()
    
                        if rcv_data[0] == "UPDATE":
                            print (rcv_data)
                            if rcv_data[1] not in self.connection and rcv_data[1] != self.nodeNum:
                                self.connection[rcv_data[1]] = [rcv_data[2], rcv_data[3]]
                                mes = "Join: " + str(rcv_data[1]) + " Total: " + str(self.connection) + "-----" + str("%.6f" % time.time()) + "\n"
                                with open('connection_'+self.nodeNum+'.log', 'a') as f3:
                                    f3.write(mes)
    
                # conn.close()
    
        def service_rcv(self, s):
            global service_bd
            while True:
                rcv = s.recv(1024)
    
                if not rcv:
                    print ("service node down")
                    break
    
                rcv = rcv.split("\n")
                
                service_bd += 1
    
                for rcv_data in rcv:
    
                    rcv_data = rcv_data.split(" ")
    
                    if not rcv_data:
                            continue
    
                    if rcv_data[0] == "INTRODUCE":
                            print (rcv_data)
                            if rcv_data[1] in self.connection:
                                continue
                            else:
                                if rcv_data[1] != self.nodeNum:
                                    self.connection[rcv_data[1]] = [rcv_data[2], rcv_data[3]]
                                    mes = "Join: " + str(rcv_data[1]) + " Total: " + str(self.connection) + "-----" + str("%.6f" % time.time()) + "\n"
                                    with open('connection_'+self.nodeNum+'.log', 'a') as f4:
                                        f4.write(mes)
                                    self.socket_send(rcv_data[1], rcv_data[2], rcv_data[3], self.query) # send query
    
                    if rcv_data[0] == "TRANSACTION":
    
                        if len(rcv_data) != 6:
                            print (rcv_data)
                            continue
    
                        if rcv_data[2] not in self.transaction:
                            
                            self.transaction[rcv_data[2]] = " ".join(rcv_data)
                            mes = "From service: " + self.transaction[rcv_data[2]] + "-----" + str("%.6f" % time.time()) + "\n"
                            with open ('transaction_'+self.nodeNum+'.log', 'a') as f5:
                                f5.write(mes)
                            threading.Thread(target = self.gossip_trans, args = (self.transaction[rcv_data[2]], )).start()
    
                    if rcv_data[0] == "DIE" or rcv_data[0] == "QUIT":
                        mes = "DIE at " + str("%.6f" % time.time()) + "\n"
                        with open('connection_'+self.nodeNum+'.log', 'a') as f6:
                            f6.write(mes)
                        os._exit(0) # kill the process
    
        def query_neighbor(self):
            while True:
                time.sleep(1)
                if self.connection:
                      neighbor_set = set(self.connection.keys()) - set(self.lost)
                      if neighbor_set:
                        neighbor = (random.sample(list(neighbor_set), 1))[0]
                        self.socket_send(neighbor, self.connection[neighbor][0], self.connection[neighbor][1], self.query) # send query
    
        def send_update(self, node, dest, port):
    
            s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
            try:
                s1.connect((dest, int(port)))
            except socket.error, e:
                print (node + " is down! UPDATE")
                print ('Strange error:%s' %e)
                self.lost.append(node)
                s1.close()
                return -1
            intros = random.sample(self.connection.keys(), len(self.connection.keys())/2)
            for i in intros:
                time.sleep(0.5)
                msg = " ".join(["UPDATE", i, self.connection[i][0], self.connection[i][1]])+"\n"
                try:
                    s1.sendall(msg.encode())
                except socket.error, e:
                	print (node + " is down! UPDATE")
                	print ('Strange error:%s' %e)
                    self.lost.append(node)
                    s1.close()
                    return -1
            return 0
    
    
    if __name__ == '__main__':
    
        ''' 
        For original input 
        '''
        # sentence = raw_input()
        # op, nodeNum, ip, port = sentence.split(" ")
    
        sentence = " ".join([op, nodeNum, ip, port])
        host = socket.gethostname()
        node = Node(host, nodeNum, ip, port)
    
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
        try:
            s.connect((server_ip, int(server_port)))
        except:
            print ("service node is down")
            s.close()
    
        t1 = threading.Thread(target = node.query_neighbor)
        t2 = threading.Thread(target = node.socket_rcv)
        t3 = threading.Thread(target = node.service_rcv, args = (s,))
    
        t1.daemon = True
        t2.daemon = True
        t3.daemon = True
    
        mes = "Connected at: " + str("%.6f" % time.time()) + "\n"
        with open ('transaction_'+nodeNum+'.log', 'a') as f0:
            f0.write(mes)
        
        t2.start()
        t3.start()
        
        try:
            s.sendall((sentence+"\n").encode())
        except:
            print ("service node is down")
            s.close()
        
        t1.start()
    
        while True:
           start = time.time()
           time.sleep(1)
           with open('bandwidth_'+nodeNum+'.log', 'a') as f:
               duration = time.time()-start
               sens = str(float((socket_bd + service_bd)/duration))+"-----"+str(duration)+"\n"
               f.write(sens)
           socket_bd = 0
           service_bd = 0