Skip to content
Snippets Groups Projects
MP2.py 9.87 KiB
import socket
import threading
import time
import thread
import os
import random
import math
import sys

server_ip = "172.22.94.198" # you may change this server ip
server_port = 4444 # you may change this server port

if len(sys.argv) != 5:
    print("Usage: MP2.py <CONNECT> <node number> <ip> <port>") # Plz type python MP2.py CONNECT {node number} {ip} {port} to start
    sys.exit(1)

op = str(sys.argv[1])
nodeNum = str(sys.argv[2])
ip = str(sys.argv[3])
port = str(sys.argv[4])

service_bd = 0 # record bandwidth
socket_bd = 0 # record bandwidth

class Node:
    def __init__(self, host, nodeNum, ip, port):
        self.host = host
        self.nodeNum = nodeNum
        self.port = int(port)
        self.ip = ip
        self.connection = {} # conncetion[node1] = ['ip', 'port']
        self.transaction = {}
        self.lost = []
        self.query = " ".join(["QUERY", nodeNum, ip, port])+"\n"

    def gossip_trans(self, msg):
        '''
        Used for Gossip Trans
        '''
        count = 0
        neighbor_set = list(set(self.connection.keys()) - set(self.lost))
        if neighbor_set:
            loops = int(2 * math.log(len(neighbor_set), 2))
            while count <= loops:
                intro_nums = min(len(neighbor_set), 2)
                intros = random.sample(neighbor_set, intro_nums)
                for i in intros:
                    if i not in self.lost:
                        self.socket_send(i, self.connection[i][0], self.connection[i][1], msg)
                        time.sleep(0.1)
                count += 1
        return 0

    def socket_send(self, node, dest, port, msg): 
    	'''
    	 Method for client socket
    	'''
        s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        
        try:
            s1.connect((dest, int(port)))
        except socket.error, e:
            print (node + " is down!")
            print ('Strange error:%s' %e)
            self.lost.append(node)
            s1.close()
            return -1

        try:
            s1.sendall(msg.encode())
        except socket.error, e:
            print (node + " is down!")
            print ('Strange error:%s' %e)
            self.lost.append(node)
            s1.close()
            return -1

        s1.close()
        return 0

    def socket_rcv(self):
        '''
        Server for other nodes
        '''
        global socket_bd
        s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        s2.bind((self.host, self.port))
        s2.listen(1000)
        while True:
            conn, addr = s2.accept()
            while True:
                rcv = conn.recv(1024)
                if not rcv:
                    break
                
                socket_bd += 1
                
                rcv = rcv.split("\n")

                for rcv_data in rcv:

                    rcv_data = rcv_data.split(" ")

                    if not rcv_data:
                        continue

                    if rcv_data[0] == "TRANSACTION":
                    	
                        if len(rcv_data) != 6:
                            print (rcv_data)
                            continue

                        if rcv_data[2] not in self.transaction:

                            self.transaction[rcv_data[2]] = " ".join(rcv_data)
                            mes = "From nodes: " + self.transaction[rcv_data[2]] + "-----" + str("%.6f" % time.time()) + "\n"
                            with open ('transaction_'+self.nodeNum+'.log', 'a') as f1:
                                f1.write(mes)
                            threading.Thread(target = self.gossip_trans, args = (self.transaction[rcv_data[2]], )).start()
                        	
                    if rcv_data[0] == "QUERY":
                        print (rcv_data)
                        if rcv_data[1] not in self.connection and rcv_data[1] != self.nodeNum:
                            self.connection[rcv_data[1]] = [rcv_data[2], rcv_data[3]]
                            mes = "Join: " + str(rcv_data[1]) + " Total: " + str(self.connection) + "-----" + str("%.6f" % time.time()) + "\n"
                            with open('connection_'+self.nodeNum+'.log', 'a') as f2:
                                f2.write(mes)
                        if rcv_data[1] not in self.lost:
                            threading.Thread(target = self.send_update, args = (rcv_data[1], rcv_data[2], rcv_data[3])).start()

                    if rcv_data[0] == "UPDATE":
                        print (rcv_data)
                        if rcv_data[1] not in self.connection and rcv_data[1] != self.nodeNum:
                            self.connection[rcv_data[1]] = [rcv_data[2], rcv_data[3]]
                            mes = "Join: " + str(rcv_data[1]) + " Total: " + str(self.connection) + "-----" + str("%.6f" % time.time()) + "\n"
                            with open('connection_'+self.nodeNum+'.log', 'a') as f3:
                                f3.write(mes)

    def service_rcv(self, s):
        '''
        Server for service node
        '''
        global service_bd
        while True:
            rcv = s.recv(1024)

            if not rcv:
                print ("service node down")
                break

            rcv = rcv.split("\n")
            
            service_bd += 1

            for rcv_data in rcv:

                rcv_data = rcv_data.split(" ")

                if not rcv_data:
                        continue

                if rcv_data[0] == "INTRODUCE":
                        print (rcv_data)
                        if rcv_data[1] in self.connection:
                            continue
                        else:
                            if rcv_data[1] != self.nodeNum:
                                self.connection[rcv_data[1]] = [rcv_data[2], rcv_data[3]]
                                mes = "Join: " + str(rcv_data[1]) + " Total: " + str(self.connection) + "-----" + str("%.6f" % time.time()) + "\n"
                                with open('connection_'+self.nodeNum+'.log', 'a') as f4:
                                    f4.write(mes)
                                self.socket_send(rcv_data[1], rcv_data[2], rcv_data[3], self.query) # send query

                if rcv_data[0] == "TRANSACTION":

                    if len(rcv_data) != 6:
                        print (rcv_data)
                        continue

                    if rcv_data[2] not in self.transaction:
                        
                        self.transaction[rcv_data[2]] = " ".join(rcv_data)
                        mes = "From service: " + self.transaction[rcv_data[2]] + "-----" + str("%.6f" % time.time()) + "\n"
                        with open ('transaction_'+self.nodeNum+'.log', 'a') as f5:
                            f5.write(mes)
                        threading.Thread(target = self.gossip_trans, args = (self.transaction[rcv_data[2]], )).start()

                if rcv_data[0] == "DIE" or rcv_data[0] == "QUIT":
                    mes = "DIE at " + str("%.6f" % time.time()) + "\n"
                    with open('connection_'+self.nodeNum+'.log', 'a') as f6:
                        f6.write(mes)
                    os._exit(0) # kill the process

    def query_neighbor(self):
        '''
        Discover Mechanism: query neighbor once a sec
        '''
        while True:
            time.sleep(1)
            if self.connection:
                  neighbor_set = set(self.connection.keys()) - set(self.lost)
                  if neighbor_set:
                    neighbor = (random.sample(list(neighbor_set), 1))[0]
                    self.socket_send(neighbor, self.connection[neighbor][0], self.connection[neighbor][1], self.query) # send query

    def send_update(self, node, dest, port):
        '''
        Send update Message to the queried node
        '''
        s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        try:
            s1.connect((dest, int(port)))
        except socket.error, e:
            print (node + " is down! UPDATE")
            print ('Strange error:%s' %e)
            self.lost.append(node)
            s1.close()
            return -1
        intros = random.sample(self.connection.keys(), len(self.connection.keys())/2)
        for i in intros:
            time.sleep(0.5)
            msg = " ".join(["UPDATE", i, self.connection[i][0], self.connection[i][1]])+"\n"
            try:
                s1.sendall(msg.encode())
            except socket.error, e:
            	print (node + " is down! UPDATE")
            	print ('Strange error:%s' %e)
                self.lost.append(node)
                s1.close()
                return -1
        return 0


if __name__ == '__main__':

    ''' 
    For original input 
    '''
    # sentence = raw_input()
    # op, nodeNum, ip, port = sentence.split(" ")

    sentence = " ".join([op, nodeNum, ip, port])
    host = socket.gethostname()
    node = Node(host, nodeNum, ip, port)

    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    try:
        s.connect((server_ip, int(server_port)))
    except:
        print ("service node is down")
        s.close()

    t1 = threading.Thread(target = node.query_neighbor)
    t2 = threading.Thread(target = node.socket_rcv)
    t3 = threading.Thread(target = node.service_rcv, args = (s,))

    t1.daemon = True
    t2.daemon = True
    t3.daemon = True

    mes = "Connected at: " + str("%.6f" % time.time()) + "\n"
    with open ('transaction_'+nodeNum+'.log', 'a') as f0:
        f0.write(mes)
    
    t2.start()
    t3.start()
    
    try:
        s.sendall((sentence+"\n").encode())
    except:
        print ("service node is down")
        s.close()
    
    t1.start()

    while True:
       start = time.time()
       time.sleep(1)
       with open('bandwidth_'+nodeNum+'.log', 'a') as f:
           duration = time.time()-start
           sens = str(float((socket_bd + service_bd)/duration))+"-----"+str(duration)+"\n"
           f.write(sens)
       socket_bd = 0
       service_bd = 0