Skip to content
Snippets Groups Projects
MP2.py 9.6 KiB
Newer Older
shunhan2's avatar
shunhan2 committed
import socket
import threading
import time
import thread
import os
import random
import math
import sys

server_ip = "172.22.94.198"
server_port = 4444

if len(sys.argv) != 5:
    print("Usage: MP2.py <CONNECT> <node number> <ip> <port>")
    sys.exit(1)

op = str(sys.argv[1])
nodeNum = str(sys.argv[2])
ip = str(sys.argv[3])
port = str(sys.argv[4])

service_bd = 0
socket_bd = 0

class Node:
    def __init__(self, host, nodeNum, ip, port):
        self.host = host
        self.nodeNum = nodeNum
        self.port = int(port)
        self.ip = ip
        self.connection = {} # conncetion[node1] = ['ip', 'port']
        self.transaction = {}
        self.lost = []
        self.query = " ".join(["QUERY", nodeNum, ip, port])+"\n"

    def gossip_trans(self, msg):
        # while True:
        #     time.sleep(1)
        count = 0
        neighbor_set = list(set(self.connection.keys()) - set(self.lost))
        if neighbor_set:
            loops = int(2 * math.log(len(neighbor_set), 2))
            # print ("I'm still sending gossips!!" + msg)
            while count <= loops:
                intro_nums = min(len(neighbor_set), 2)
                intros = random.sample(neighbor_set, intro_nums)
                for i in intros:
                    if i not in self.lost:
                        self.socket_send(i, self.connection[i][0], self.connection[i][1], msg)
                        time.sleep(0.1)
                count += 1
        return 0

    def socket_send(self, node, dest, port, msg): 
    	'''
    	 method for client socket
    	'''
        s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        
        try:
            s1.connect((dest, int(port)))
        except socket.error, e:
            print (node + " is down!")
            print ('Strange error:%s' %e)
            self.lost.append(node)
            s1.close()
            return -1

        try:
            s1.sendall(msg.encode())
        except socket.error, e:
            print (node + " is down!")
            print ('Strange error:%s' %e)
            self.lost.append(node)
            s1.close()
            return -1

        s1.close()
        return 0

    def socket_rcv(self):
        global socket_bd
        s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        s2.bind((self.host, self.port))
        s2.listen(1000)
        while True:
            conn, addr = s2.accept()
            while True:
                rcv = conn.recv(1024)
                if not rcv:
                    # print ("target addr:" + str(addr) + "is down!\n")
                    break
                
                socket_bd += 1
                
                rcv = rcv.split("\n")

                for rcv_data in rcv:

                    rcv_data = rcv_data.split(" ")

                    if not rcv_data:
                        continue

                    if rcv_data[0] == "TRANSACTION":
                    	
                        if len(rcv_data) != 6:
                            print (rcv_data)
                            continue

                        if rcv_data[2] not in self.transaction:

                            self.transaction[rcv_data[2]] = " ".join(rcv_data)
                            mes = "From nodes: " + self.transaction[rcv_data[2]] + "-----" + str("%.6f" % time.time()) + "\n"
                            with open ('transaction_'+self.nodeNum+'.log', 'a') as f1:
                                f1.write(mes)
                            threading.Thread(target = self.gossip_trans, args = (self.transaction[rcv_data[2]], )).start()
                        	
                    if rcv_data[0] == "QUERY":
                        print (rcv_data)
                        if rcv_data[1] not in self.connection and rcv_data[1] != self.nodeNum:
                            self.connection[rcv_data[1]] = [rcv_data[2], rcv_data[3]]
                            mes = "Join: " + str(rcv_data[1]) + " Total: " + str(self.connection) + "-----" + str("%.6f" % time.time()) + "\n"
                            with open('connection_'+self.nodeNum+'.log', 'a') as f2:
                                f2.write(mes)
                        if rcv_data[1] not in self.lost:
                            threading.Thread(target = self.send_update, args = (rcv_data[1], rcv_data[2], rcv_data[3])).start()

                    if rcv_data[0] == "UPDATE":
                        print (rcv_data)
                        if rcv_data[1] not in self.connection and rcv_data[1] != self.nodeNum:
                            self.connection[rcv_data[1]] = [rcv_data[2], rcv_data[3]]
                            mes = "Join: " + str(rcv_data[1]) + " Total: " + str(self.connection) + "-----" + str("%.6f" % time.time()) + "\n"
                            with open('connection_'+self.nodeNum+'.log', 'a') as f3:
                                f3.write(mes)

            # conn.close()

    def service_rcv(self, s):
        global service_bd
        while True:
            rcv = s.recv(1024)

            if not rcv:
                print ("service node down")
                break

            rcv = rcv.split("\n")
            
            service_bd += 1

            for rcv_data in rcv:

                rcv_data = rcv_data.split(" ")

                if not rcv_data:
                        continue

                if rcv_data[0] == "INTRODUCE":
                        print (rcv_data)
                        if rcv_data[1] in self.connection:
                            continue
                        else:
                            if rcv_data[1] != self.nodeNum:
                                self.connection[rcv_data[1]] = [rcv_data[2], rcv_data[3]]
                                mes = "Join: " + str(rcv_data[1]) + " Total: " + str(self.connection) + "-----" + str("%.6f" % time.time()) + "\n"
                                with open('connection_'+self.nodeNum+'.log', 'a') as f4:
                                    f4.write(mes)
                                self.socket_send(rcv_data[1], rcv_data[2], rcv_data[3], self.query) # send query

                if rcv_data[0] == "TRANSACTION":

                    if len(rcv_data) != 6:
                        print (rcv_data)
                        continue

                    if rcv_data[2] not in self.transaction:
                        
                        self.transaction[rcv_data[2]] = " ".join(rcv_data)
                        mes = "From service: " + self.transaction[rcv_data[2]] + "-----" + str("%.6f" % time.time()) + "\n"
                        with open ('transaction_'+self.nodeNum+'.log', 'a') as f5:
                            f5.write(mes)
                        threading.Thread(target = self.gossip_trans, args = (self.transaction[rcv_data[2]], )).start()

                if rcv_data[0] == "DIE" or rcv_data[0] == "QUIT":
                    mes = "DIE at " + str("%.6f" % time.time()) + "\n"
                    with open('connection_'+self.nodeNum+'.log', 'a') as f6:
                        f6.write(mes)
                    os._exit(0) # kill the process

    def query_neighbor(self):
        while True:
            time.sleep(1)
            if self.connection:
                  neighbor_set = set(self.connection.keys()) - set(self.lost)
                  if neighbor_set:
                    neighbor = (random.sample(list(neighbor_set), 1))[0]
                    self.socket_send(neighbor, self.connection[neighbor][0], self.connection[neighbor][1], self.query) # send query

    def send_update(self, node, dest, port):

        s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        try:
            s1.connect((dest, int(port)))
        except socket.error, e:
            print (node + " is down! UPDATE")
            print ('Strange error:%s' %e)
            self.lost.append(node)
            s1.close()
            return -1
        intros = random.sample(self.connection.keys(), len(self.connection.keys())/2)
        for i in intros:
            time.sleep(0.5)
            msg = " ".join(["UPDATE", i, self.connection[i][0], self.connection[i][1]])+"\n"
            try:
                s1.sendall(msg.encode())
            except socket.error, e:
            	print (node + " is down! UPDATE")
            	print ('Strange error:%s' %e)
                self.lost.append(node)
                s1.close()
                return -1
        return 0


if __name__ == '__main__':

    ''' 
    For original input 
    '''
    # sentence = raw_input()
    # op, nodeNum, ip, port = sentence.split(" ")

    sentence = " ".join([op, nodeNum, ip, port])
    host = socket.gethostname()
    node = Node(host, nodeNum, ip, port)

    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    try:
        s.connect((server_ip, int(server_port)))
    except:
        print ("service node is down")
        s.close()

    t1 = threading.Thread(target = node.query_neighbor)
    t2 = threading.Thread(target = node.socket_rcv)
    t3 = threading.Thread(target = node.service_rcv, args = (s,))

    t1.daemon = True
    t2.daemon = True
    t3.daemon = True

    mes = "Connected at: " + str("%.6f" % time.time()) + "\n"
    with open ('transaction_'+nodeNum+'.log', 'a') as f0:
        f0.write(mes)
    
    t2.start()
    t3.start()
    
    try:
        s.sendall((sentence+"\n").encode())
    except:
        print ("service node is down")
        s.close()
    
    t1.start()

    while True:
       start = time.time()
       time.sleep(1)
       with open('bandwidth_'+nodeNum+'.log', 'a') as f:
           duration = time.time()-start
           sens = str(float((socket_bd + service_bd)/duration))+"-----"+str(duration)+"\n"
           f.write(sens)
       socket_bd = 0
       service_bd = 0