diff --git a/mp1/constants.py b/mp1/constants.py index 56c7595ddc8334be98947abe36979edf843d3752..2caf3f10716d69ad4e1aaa9c8c4de8fffda17603 100644 --- a/mp1/constants.py +++ b/mp1/constants.py @@ -11,3 +11,8 @@ CGREEN = '\033[92m' CEND = '\033[0m' RECONNECT_TIME = 1 # 1s + +# INITIAL_MSG_LENGTH = 64 +# REQUEST_DISCONNECT_MSG = 'REQUEST_DISCONNECT!' +ENCODING_FORMAT = 'utf-8' + diff --git a/mp1/message.py b/mp1/message.py index 156b0116c4bc3e4c09ecdf37a0f8e3c767b58723..6efe39f42aa6b7e13f85e19591fc40b9c105f0f7 100644 --- a/mp1/message.py +++ b/mp1/message.py @@ -1,8 +1,23 @@ from transaction import Transaction +import time class Message: def __init__(self, transaction: Transaction, priority: int, node_num: int, is_final_msg: bool): self.transaction = transaction - self.priority = str.format(f"{priority}-{node_num}") + # the format changed to priority.node_num + self.priority = str.format(f"{priority}.{node_num}") self.is_final_msg = is_final_msg + # self.send_time = -1 # will be set when send the message + self.node_num = node_num + + + # for the order of priority queue + + def __lt__(self, other): + temp1 = float(self.priority) + temp2 = float(other.priority) + return temp1 < temp2 + + + diff --git a/mp1/multicast.py b/mp1/multicast.py new file mode 100644 index 0000000000000000000000000000000000000000..2a460002a076720f8979d95b38ec3e5a4441c7ae --- /dev/null +++ b/mp1/multicast.py @@ -0,0 +1,64 @@ +# import socket +# import message +# +# class SockError(RuntimeError): +# def __init__(self, arg): +# self.sock = arg +# +# class R_multicast: +# def __init__(self, group, my_id, message, application_deliver): +# self.group = group +# self.id = my_id +# self.deliver = application_deliver +# self.message = message +# def multicast_send(self, message): +# err_conn = [] +# for member in self.group: +# try: +# # class unicast: +# # @classmethod +# # def send(cls, msg, socket_touse): # send the message to the specified member (use TCP +# # socket_touse.sendall(msg) +# # @classmethod +# # def register_deliver(cls, b_multicast_deliver): +# # cls.b_multicast_deliver = b_multicast_deliver +# # @classmethod +# # def deliver(cls, msg): +# # cls.b_multicast_deliver(msg) +# # +# # +# # class B_multicast: +# # def __init__(self, group, my_id, r_multicast_deliver): +# # self.group = group +# # self.id = my_id +# # self.deliver = r_multicast_deliver +# # unicast.register_deliver(self.b_multicast_deliver) +# # self.send_amount = 0 +# # +# # def send(self, msg): +# # err_conn = [] +# # for member in self.group: +# # try: +# # unicast.send(msg, member) +# # self.send_amount += 1 +# # except socket.error: +# # err_conn.append(member) +# # return err_conn +# # +# # def b_multicast_deliver(self, msg): +# # self.deliver(msg) +# # +# # class R_multicast: +# # def __init__(self, group, my_id, app_deliver): +# # self.group = group +# # self.id = my_id +# # self.deliver = app_deliver # this is application's deliver +# # self.b_multicast = B_multicast(group, my_id, self.r_multicast_deliver) +# # self.messages = set() +# # +# # def send(self, msg): +# # return self.b_multicast.send(msg) +# # +# # def r_multicast_deliver(self, msg): +# # final_message = Message() +# # final_message = msg diff --git a/mp1/node.py b/mp1/node.py index cea692b8cd6e282a028c4a3f4a1c8d8e898be4b1..52c428e250a7e9f2b786d59cffbc25ad9785de71 100644 --- a/mp1/node.py +++ b/mp1/node.py @@ -1,10 +1,17 @@ +import collections +import heapq import sys import time import threading -from constants import NODE_ADDRESS, NODE_IDENTIFIER, NODE_PORT, CRED, CEND, RECONNECT_TIME, CGREEN +from constants import NODE_ADDRESS, NODE_IDENTIFIER, NODE_PORT, CRED, CEND, RECONNECT_TIME, CGREEN, ENCODING_FORMAT from utils import parse_config import socket import copy +from message import Message +from queue import PriorityQueue +import collections +from transaction import Transaction + # import traceback SERVER = socket.gethostbyname(socket.gethostname()) @@ -12,6 +19,64 @@ NODE_ID_TO_IP_MAPPING = {} IP_TO_NODE_ID_MAPPING = {} +class SelfNode: + def __init__(self, my_id): + self.balance = collections.defaultdict(int) # {name, amount} the balances collection in this node + self.proposed_id = 1 # for proposed id from num 1 start + self.priority_queue = [Message] # priority_queue for the message + self.my_id = my_id # my node id + self.group_socket_node = collections.defaultdict(int) + self.err_node = {} + self.start_time = -1 + self.lock = threading.Lock() + self.transactions = collections.defaultdict(list) + self.received_messages = set() # æ ¼å¼ transaction-priority-node_num-is_final_msg-transaction_time 组æˆçš„string + + def update_priority(self, observed_priority=None): + result = 0 + self.lock.acquire() + if not observed_priority: + result = self.proposed_id + proposed_id = self.proposed_id + 1 + else: + self.proposed_id = max(self.proposed_id, observed_priority + 1) + self.lock.release() + return result + + +def deliver(self_node: SelfNode): # 需è¦ä¸€ä¸ªçº¿ç¨‹ä¸€ç›´ç”¨ä»– + while True: + while len(self_node.priority_queue) != 0: + deliverable_msg = None + self_node.lock.acquire() + if self_node.priority_queue[0].is_final_msg: # 有å¯èƒ½ä¼šé”™ isfinalmsg ä¸çŸ¥é“èµ·ä¸èµ·ä½œç”¨ + deliverable_msg = self_node.priority_queue[0] + self_node.priority_queue.pop(0) + # ä¸ç”¨è¿›è¡Œå†æ¬¡æŽ’åºï¼Œå› 为能进入deliver 说明都是已ç»sortè¿‡äº†ï¼Œå› ä¸ºæ²¡æ¬¡å¯¹äºŽqueueçš„æ·»åŠ å’Œä¿®æ”¹priority éƒ½ä¼šé‡æ–°å¯¹queue è¿›è¡ŒæŽ’åº + self_node.lock.release() + if not (deliverable_msg is None): + calculate_transaction(deliverable_msg, self_node) + + +def calculate_transaction(msg: Message, self_node: SelfNode): + present_transaction = msg.transaction + if present_transaction.type == "DEPOSIT": + if present_transaction.account in self_node.balance: + self_node.balance[present_transaction.account] += present_transaction.fund + else: + self_node.balance[present_transaction.account] = present_transaction.fund + else: # transfer + if self_node.balance[present_transaction.from_account] >= present_transaction.fund: + self_node.balance[present_transaction.from_account] -= present_transaction.fund + self_node.balance[present_transaction.to_account] += present_transaction.fund + print_balance(self_node) + + +def print_balance(self_node: SelfNode): + print('BALANCES', end=' ') + print(' '.join('{}:{}'.format(a, b) for a, b in sorted(self_node.balance.items()))) + + def connect_with_all_nodes(nodes: dict, current_node_id: str, return_values: dict): nodes.pop(current_node_id, None) connect_info = {} @@ -26,7 +91,7 @@ def connect_with_all_nodes(nodes: dict, current_node_id: str, return_values: dic s.connect(address) connect_info[node_id] = s print(CGREEN + f"[Connect] - " - f"Connecting to {node[NODE_IDENTIFIER]}: {host_ip_address}:{node[NODE_PORT]} success!" + CEND) + f"Connecting to {node[NODE_IDENTIFIER]}: {host_ip_address}:{node[NODE_PORT]} success!" + CEND) except Exception: print(CRED + f"[Connect] - Connection to " f"{node[NODE_IDENTIFIER]}: {host_ip_address}:{node[NODE_PORT]} failed. " @@ -61,10 +126,10 @@ def bind_with_all_nodes(nodes: dict, current_node_id: str, return_values: dict): def establish_bidirectional_connection(nodes, current_node_id): - return_values = {} - thread1 = threading.Thread(target=connect_with_all_nodes, args=[copy.deepcopy(nodes), current_node_id, return_values]) + thread1 = threading.Thread(target=connect_with_all_nodes, + args=[copy.deepcopy(nodes), current_node_id, return_values]) thread2 = threading.Thread(target=bind_with_all_nodes, args=[copy.deepcopy(nodes), current_node_id, return_values]) thread1.start() @@ -81,11 +146,112 @@ def map_hostname_to_ip(nodes): IP_TO_NODE_ID_MAPPING[NODE_ID_TO_IP_MAPPING[node_id]] = node_id +def msg_to_msg_id(msg: Message): + msg_id = msg.transaction.transaction_id + "-" + msg.priority + "-" + str(msg.is_final_msg) + return msg_id + + +def receive(self_node: SelfNode, msg: Message): + # 这里逻辑我ä¸çŸ¥é“æ€Žä¹ˆå†™ï¼Œå› ä¸ºè§‰å¾—éœ€è¦çº¿ç¨‹ä¸€ç›´ç›‘å¬ç€ + # receive 到msg之åŽï¼Œ + msg_id = msg_to_msg_id(msg) + if msg_id in self_node.received_messages: + pass + # å·²ç»æ”¶åˆ°è¿‡è¿™æ¡msg,ä¸ç”¨å†å¹¿æ’了,并且pass? + else: + # 需è¦è¿›è¡Œå¹¿æ’ + # 广æ’ï¼æ²¡å†™ + # è¿›è¡Œå¤„ç† + self_node.received_messages.put(msg_id) # 放入received_messages + if msg.is_final_msg: + # 带ç€argeementçš„,更新在priority queue ä¸çš„ä½ç½® + self_node.lock.acquire() + for temp_msg in self_node.priority_queue: + if temp_msg.transaction.transaction_id == msg.transaction.transaction_id: + temp_msg.is_final_msg = True + temp_msg.node_num = msg.node_num + temp_msg.priority = msg.priority + self_node.priority_queue.sort() + self_node.lock.release() + else: + # msg带的自己的transaction,那就是别的node带ç€propose priorityæ¥äº† + if msg.transaction.transaction_id in self_node.transactions.keys(): + # æ›´æ–°transaction收到的propose çš„ node listçš„ + self_node.transactions[msg.transaction.transaction_id].append(msg.node_num) + # 没写 这里 收到的propose çš„ node list 是å¦å’Œ 现有的连接的nodeä¸€æ ·çš„ + agree = False # 暂且设为没有全 + if agree: + self_node.lock.acquire() + for temp_msg in self_node.priority_queue: + if temp_msg.transaction.transaction_id == msg.transaction.transaction_id: + temp_msg.is_final_msg = True + if float(temp_msg.priority) < float(msg.priority): + temp_msg.priority = msg.priority + self_node.priority_queue.sort() + self_node.lock.release() + else: + self_node.lock.acquire() + for temp_msg in self_node.priority_queue: + if temp_msg.transaction.transaction_id == msg.transaction.transaction_id: + temp_msg.is_final_msg = False + if float(temp_msg.priority) < float(msg.priority): + temp_msg.priority = msg.priority + self_node.priority_queue.sort() + self_node.lock.release() + + +def multicast(self_node: SelfNode, msg: Message): + # first put into the received message set + msg_str = msg_to_msg_id(msg) + # å°†è¦å‘出的message 放到已接收消æ¯çš„set ä¸ + self_node.received_messages(msg) + # 对group ä¸çš„ç‚¹è¿›è¡Œå¹¿æ’ + for each_socket in self_node.group_socket_node.values(): + each_socket.send(msg.encode(ENCODING_FORMAT)) # ??这里需è¦å¹¿æ’我ä¸ä¼šäº† + + +def read_gentx(self_node: SelfNode): + while True: + data = input() + # print('!!!!'+data) + temp_transaction = Transaction(data, self_node.my_id) + priority = self_node.update_priority() # ç»™priority id 有问题的,我还è¦å†æƒ³æƒ³ + + msg = Message(temp_transaction, priority, self_node.my_id, + False) # generate a object could be put in the priority queue + + # è¦è¿›è¡Œä¸€ä¸ªå¹¿æ’ 但是个还没有写 ?? + muticast() + + self_node.lock.acquire() + self_node.priority_queue.append(msg) # put the msg into priority_queue + self_node.priority_queue.sort() + self_node.lock.release() + + self_node.transactions[temp_transaction.transaction_id].append(self_node.my_id) # 放到transaction ä¸çš„列表 + + # while not self_node.priority_queue.empty(): # when the priority queue not empty + # temp = self_node.priority_queue.get() + # if self_node.priority_queue.get().is_final_msg: + # deliver(self_node.priority_queue.get()) #deliver 还没写呢 + # else: + # self_node.priority_queue.put(temp) + # break; + + if __name__ == '__main__': current_node_id = sys.argv[1] config_path = sys.argv[2] nodes = parse_config(config_path) map_hostname_to_ip(nodes) + self_node = SelfNode(current_node_id) + + return_values = establish_bidirectional_connection(nodes, current_node_id) + # å°†group 进行一个åˆå§‹åŒ– + for key, value in return_values.items(): + for node, sok in value.items(): + self_node.group_socket_node[node] = sok + thread_deliver = threading.Thread(target=deliver(self_node)) ''' return_values = { @@ -105,5 +271,5 @@ if __name__ == '__main__': }, } ''' - return_values = establish_bidirectional_connection(nodes, current_node_id) + # print(return_values) diff --git a/mp1/test.py b/mp1/test.py new file mode 100644 index 0000000000000000000000000000000000000000..c7f954e6272d4bdbbf3a33066322c4b7e0536596 --- /dev/null +++ b/mp1/test.py @@ -0,0 +1,19 @@ + + +class Message: + def __init__(self, priority: int, node_num: int): + + self.priority = str.format(f"{priority}.{node_num}") + + def __lt__(self, other): + temp1 = float(self.priority) + temp2 = float(other.priority) + return temp1<temp2 +if __name__ == "__main__": + msg1 = Message(2,1) + msg3 = Message(3,2) + msg2 = Message(2, 3) + temp = [msg1,msg2,msg3] + temp.sort() + for msg in temp: + print(msg.priority) \ No newline at end of file diff --git a/mp1/transaction.py b/mp1/transaction.py index 86ad135a558193b2cecb1c17d363b21df4f3f556..fe2559abad86ad9f00f00bbbc9ec82d29feda30a 100644 --- a/mp1/transaction.py +++ b/mp1/transaction.py @@ -1,9 +1,14 @@ +import time + from constants import DEPOSIT, TRANSFER class Transaction: - def __init__(self, transaction_str: str): + def __init__(self, transaction_str: str,my_id:int): + self.transaction_origin = my_id self.transaction_str = transaction_str + self.transaction_time = time.time() + self.transaction_id = str(my_id)+transaction_str + "_" + self.transaction_time str_segments = transaction_str.split(" ") if transaction_str.startswith(DEPOSIT): diff --git a/mp1/utils.py b/mp1/utils.py index c320baf3261de5431ffc7847f4113990cae1c2fb..7b1cdb372a069ac16f72e8aea526e16dfe5a80c7 100644 --- a/mp1/utils.py +++ b/mp1/utils.py @@ -1,7 +1,10 @@ +import collections + from constants import NODE_IDENTIFIER, NODE_ADDRESS, NODE_PORT, CEND, CRED from multiprocessing.pool import ThreadPool import multiprocessing as mp import traceback +from transaction import Transaction MAX_THREAD_COUNT = mp.cpu_count() THREAD_POOL = ThreadPool(MAX_THREAD_COUNT) @@ -38,3 +41,12 @@ def run_in_threads(task_list, **kwargs): .format(e.__class__.__name__, e, traceback.format_exc()) + CEND) results.append(value) return results + + +def deliverable(transaction: Transaction, transactions_proposednode: collections.defaultdict, group: dict): + transaction_id = transaction.transaction_str + "!" + transaction.transaction_time + nodes_list = transactions_proposednode[transaction_id] + for node in group: + if node not in nodes_list: + return False + return True