diff --git a/.flake8 b/.flake8 index b344cedd36ab876b443c80fc823b759083341405..97dacf936959b922f29a17606b7581780dd96ce3 100644 --- a/.flake8 +++ b/.flake8 @@ -8,4 +8,6 @@ exclude = ./sample_test/src/*.py per-file-ignores = ./mp1/node.py:W291 + ./mp3/server.py:F841 + diff --git a/mp3/client.py b/mp3/client.py index b600bb9e445e72869b8ac8d85ca64d01a49dd025..c4a159cb17be3b07d54db4340626eb382a72e176 100644 --- a/mp3/client.py +++ b/mp3/client.py @@ -1,6 +1,6 @@ import random from constants import SERVER_IDENTIFIER, SERVER_ADDRESS, SERVER_PORT, ENCODING_FORMAT, MSG_LENGTH -from utils import gprint, pprint +# from utils import gprint, pprint import os import sys import socket @@ -37,9 +37,9 @@ def connect(coordinator_info: tuple): coordinator_port_number = coordinator_info[1][SERVER_PORT] client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) address = (coordinator_ip_address, coordinator_port_number) - pprint(f'[connect] - client connecting to address {address}') + # pprint(f'[connect] - client connecting to address {address}') client.connect(address) - pprint(f'[connect] - client connecting to address {address} success!') + # pprint(f'[connect] - client connecting to address {address} success!') return client @@ -48,17 +48,17 @@ def bind(): server.bind(ADDR) server.listen() conn, addr = server.accept() - gprint(f"[Bind] - Bind to {addr} success!") + # gprint(f"[Bind] - Bind to {addr} success!") return conn def send_operation(operation_str, socket): - msg = operation_str.upper().encode(ENCODING_FORMAT) # todo: upper case + msg = operation_str.encode(ENCODING_FORMAT) # todo: upper case msg_length = len(msg) # send_length = str(msg_length).encode(ENCODING_FORMAT) msg += b' ' * (MSG_LENGTH - msg_length) # pprint(f'[send_operation] - {locals()}') - pprint(f'[send_operation] - msg.length : {len(msg)}') + # pprint(f'[send_operation] - msg.length : {len(msg)}') socket.sendall(msg) @@ -77,10 +77,10 @@ if __name__ == '__main__': coordinator_info = pick_coordinator(servers) client_to_coordinator_socket = connect(coordinator_info) - pprint('[client_to_coordinator_socket] CONNECT SUCCESS!') + # pprint('[client_to_coordinator_socket] CONNECT SUCCESS!') # coordinator_to_client_socket = bind() # pprint('[bind] BIND SUCCESS!') - print(coordinator_info) + # print(coordinator_info) try: while True: operation = input() @@ -95,9 +95,9 @@ if __name__ == '__main__': break # todo: add end condition except EOFError: - gprint("[Client] - All commands in .txt file executed. Client Exits..") + # gprint("[Client] - All commands in .txt file executed. Client Exits..") os._exit(0) finally: client_to_coordinator_socket.close() - gprint("[Client] - client_to_coordinator_socket CLOSED..") + # gprint("[Client] - client_to_coordinator_socket CLOSED..") # coordinator_to_client_socket.close() diff --git a/mp3/server.py b/mp3/server.py index cd78eee06ad27c4ae83300eda067d7a26a1717f7..4fb23369da8b5129b68261447aaafbd3a25b8827 100644 --- a/mp3/server.py +++ b/mp3/server.py @@ -3,7 +3,8 @@ import json import sys import time import traceback -from utils import gprint, rprint, pprint +from utils import rprint +# from utils import gprint, rprint, pprint from constants import SERVER_IDENTIFIER, SERVER_ADDRESS, SERVER_PORT, RECONNECT_TIME, MSG_LENGTH, ENCODING_FORMAT, \ ACCEPTING_PEER_PORT import socket @@ -43,8 +44,9 @@ def commit_ok_transaction(timestamp: str, self_node: SelfNode): if not self_node.lock.locked(): self_node.lock.acquire() try: - pprint(f'[commit_ok_transaction] - {timestamp} - ENTER CHECKPOINT') + # pprint(f'[commit_ok_transaction] - {timestamp} - ENTER CHECKPOINT') # timestamp 是肯定å¯ä»¥commit + has_account_modified = False for cur_account in accounts_map.values(): # åˆ é™¤è¿™ä¸ªaccountRTS 䏿‰€æœ‰çš„timestamp cur_account.RTS = list(filter(lambda x: x != timestamp, cur_account.RTS)) @@ -58,10 +60,14 @@ def commit_ok_transaction(timestamp: str, self_node: SelfNode): # __import__('pudb').set_trace() cur_account.balance += cur_account.TW[timestamp].operation_value cur_account.TW.pop(timestamp) - break + has_account_modified = True + # break # self_node.lock.release() - pprint(f'[commit_ok_transaction] - {timestamp} - EXIT CHECKPOINT') + # pprint(f'[commit_ok_transaction] - {timestamp} - EXIT CHECKPOINT') + if has_account_modified: + print_all_available_accounts_balance() except Exception as e: + # pass rprint(f"[ERROR] commit_ok_transaction error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") finally: if self_node.lock.locked(): @@ -71,7 +77,7 @@ def commit_ok_transaction(timestamp: str, self_node: SelfNode): # 收到commit 的消æ¯ï¼Œè¦åŽ»check def commit_check_transaction(timestamp: str, self_node: SelfNode) -> bool: - pprint(f'[commit_check_transaction] - {timestamp} - ENTER CHECKPOINT') + # pprint(f'[commit_check_transaction] - {timestamp} - ENTER CHECKPOINT') if not self_node.lock.locked(): self_node.lock.acquire() try: @@ -91,7 +97,7 @@ def commit_check_transaction(timestamp: str, self_node: SelfNode) -> bool: if timestamp in TW.keys(): flag = True while flag: - pprint('[commit_check_transaction] - sleeping check 1...') + # pprint('[commit_check_transaction] - sleeping check 1...') min_write_timestamp = timestamp for temp in TW.keys(): if temp < min_write_timestamp: @@ -107,12 +113,12 @@ def commit_check_transaction(timestamp: str, self_node: SelfNode) -> bool: flag = True while flag: # __import__('pudb').set_trace() - pprint('[commit_check_transaction] - sleeping check 2...') + # pprint('[commit_check_transaction] - sleeping check 2...') # 这个时候还是有é”çš„ï¼Œå› ä¸ºæ˜¯ä»Žflag == false过æ¥çš„ # min_read_timestamp = min(RTS) # min_read_timestamp = min(list(filter(lambda x: x != 0, RTS))) # DELETE 0 IN THIS CASE # __import__('pudb').set_trace() - pprint(f'[commit_check_transaction] - accounts_map[_key].RTS: {accounts_map[_key].RTS}') + # pprint(f'[commit_check_transaction] - accounts_map[_key].RTS: {accounts_map[_key].RTS}') min_read_timestamp = min(list(filter(lambda x: x != 0, accounts_map[_key].RTS))) if min_read_timestamp == timestamp: flag = False @@ -136,13 +142,14 @@ def commit_check_transaction(timestamp: str, self_node: SelfNode) -> bool: continue # self_node.lock.release() - pprint(f'[commit_check_transaction] - {timestamp} - EXIT CHECKPOINT') + # pprint(f'[commit_check_transaction] - {timestamp} - EXIT CHECKPOINT') # 如果transaction和这个server䏿‰€æœ‰çš„account都没有关系的è¯, 那么还是返回true # 如果说有关系, é‚£è¿åconsistency, 那就ä¸è¡Œ # 如果ä¸è¿å, åªæ˜¯å‰é¢çš„transaction没有commit, 那就是ç‰ç€ return commit_ok except Exception as e: - rprint(f"[ERROR] commit_check_transaction error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") + pass + # rprint(f"[ERROR] commit_check_transaction error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") finally: if self_node.lock.locked(): self_node.lock.release() @@ -155,7 +162,7 @@ def tentative_write(self_node: SelfNode, timestamp: str, rawOperation: str): self_node.lock.acquire() try: - pprint(f'[tentative_write] - {timestamp} - {rawOperation} - ENTER CHECKPOINT') + # pprint(f'[tentative_write] - {timestamp} - {rawOperation} - ENTER CHECKPOINT') # __import__('pudb').set_trace() # raw operiation # deposit å’Œwithdraw还ä¸ä¸€æ · account_name = rawOperation.split()[1] # DEPOSIT A.foo 50 @@ -173,25 +180,25 @@ def tentative_write(self_node: SelfNode, timestamp: str, rawOperation: str): # æ›´æ–°TW cur_account.TW[timestamp].operation_value += operation_value # self_node.lock.release() - pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 1') + # pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 1') return True else: new_tentative_write = WriteEntry(timestamp, operation_value) cur_account.TW[timestamp] = new_tentative_write # !!!用TW的时候需è¦å¯¹TW è¿›è¡ŒæŽ’åº # self_node.lock.release() - pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 2') + # pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 2') return True else: self_node.lock.release() - pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 3') + # pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 3') return None # 需è¦abort else: # 如果deposit 一个没有å˜åœ¨çš„账户,是å¯ä»¥çš„ new_account = Account(account_name, timestamp) new_account.TW[timestamp] = WriteEntry(timestamp, operation_value) accounts_map[account_name] = new_account # todo: to be checked with Yan # self_node.lock.release() - pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4') + # pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4') return True # else: # == "WITHDRAW" 是read å’Œwriteçš„ç»“åˆ @@ -200,20 +207,21 @@ def tentative_write(self_node: SelfNode, timestamp: str, rawOperation: str): read_result = read(self_node, timestamp, readOperation) if read_result is None: - pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.5') + # pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.5') return None elif read_result == 'NOT FOUND, ABORTED': # 需è¦abort了 并且是 NOT FOUND, ABORTED # self_node.lock.release() - pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 5') + # pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 5') return 'NOT FOUND, ABORTED' else: # è¿™é‡Œé»˜è®¤ä¼ å…¥çš„operation_value 是有æ£è´Ÿä¹‹åˆ†çš„ readOperation = "DEPOSIT" + " " + account_name + " " + str(operation_value * -1) - pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 6') + # pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 6') return tentative_write(self_node, timestamp, readOperation) # todo: one more params? except Exception as e: - rprint(f"[ERROR] tentative_write error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") + pass + # rprint(f"[ERROR] tentative_write error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") finally: if self_node.lock.locked(): self_node.lock.release() @@ -252,7 +260,7 @@ def tentative_write(self_node: SelfNode, timestamp: str, rawOperation: str): # BALANCE COME HERE def read(self_node: SelfNode, timestamp: str, rawOperation: str): - pprint(f'[read] - {timestamp} - {rawOperation} - ENTER CHECKPOINT') + # pprint(f'[read] - {timestamp} - {rawOperation} - ENTER CHECKPOINT') if not self_node.lock.locked(): self_node.lock.acquire() @@ -263,13 +271,13 @@ def read(self_node: SelfNode, timestamp: str, rawOperation: str): # self_node.lock.acquire() if account_name not in accounts_map.keys(): # self_node.lock.release() - pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 1') + # pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 1') return 'NOT FOUND, ABORTED' # 表示没有读到 需è¦abort while flag: # self_node.lock.acquire() if account_name not in accounts_map.keys(): # self_node.lock.release() - pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 2') + # pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 2') return 'NOT FOUND, ABORTED' # 表示没有读到 需è¦abort cur_account: Account = accounts_map[account_name] @@ -315,10 +323,10 @@ def read(self_node: SelfNode, timestamp: str, rawOperation: str): if timestamp in cur_account.TW.keys(): new_temp_balance_this_timestamp = cur_balance + cur_account.TW[timestamp].operation_value # self_node.lock.release() - pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.4.1') + # pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.4.1') return new_temp_balance_this_timestamp else: - pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.4.2') + # pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.4.2') return cur_balance cur_account.RTS.append(timestamp) @@ -326,10 +334,10 @@ def read(self_node: SelfNode, timestamp: str, rawOperation: str): if timestamp in cur_account.TW.keys(): new_temp_balance_this_timestamp = cur_balance + cur_account.TW[timestamp].operation_value # self_node.lock.release() - pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.5.1') + # pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.5.1') return new_temp_balance_this_timestamp else: - pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.5.2') + # pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.5.2') return cur_balance else: if Ds_written_by == timestamp: @@ -338,24 +346,25 @@ def read(self_node: SelfNode, timestamp: str, rawOperation: str): new_temp_balance_this_timestamp = cur_balance + cur_account.TW[timestamp].operation_value flag = False # self_node.lock.release() - pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 5.1') + # pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 5.1') return new_temp_balance_this_timestamp # ä¼ å›ž 应该有的值, ä¼ å›žcoordinator的在其他函数写 else: - pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 5.2') + # pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 5.2') return cur_balance else: - pprint(f'[read] - {timestamp} - {rawOperation} - SLEEPING...') + # pprint(f'[read] - {timestamp} - {rawOperation} - SLEEPING...') self_node.lock.release() time.sleep(0.01) self_node.lock.acquire() else: # abort(没写), ä¼ è¾“ç»™coordinator 没写 # self_node.lock.release() - pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 6') + # pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 6') return None except Exception as e: - rprint(f"[ERROR] read error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") + pass + # rprint(f"[ERROR] read error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") finally: if self_node.lock.locked(): self_node.lock.release() @@ -363,7 +372,7 @@ def read(self_node: SelfNode, timestamp: str, rawOperation: str): # abort 的本身 def abort(self_node: SelfNode, timestamp: str): - pprint(f'[abort] - {timestamp} - ENTER CHECKPOINT') + # pprint(f'[abort] - {timestamp} - ENTER CHECKPOINT') # #对当å‰server 所有的account çš„TW å’ŒRTS ä¸è¿›è¡ŒæŸ¥æ‰¾çœ‹çœ‹è¿™ä¸ªtimestamp 的东西 # self_node.lock.acquire() @@ -402,9 +411,10 @@ def abort(self_node: SelfNode, timestamp: str): if timestamp in list(cur_account.TW.keys()): del cur_account.TW[timestamp] # self_node.lock.release() - pprint(f'[abort] - {timestamp} - EXIT CHECKPOINT') + # pprint(f'[abort] - {timestamp} - EXIT CHECKPOINT') except Exception as e: - rprint(f"[ERROR] abort error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") + pass + # rprint(f"[ERROR] abort error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") finally: self_node.lock.release() @@ -462,22 +472,22 @@ def connect_with_all_servers(servers: dict, current_server_id: str, return_value host_ip_address = SERVER_ID_TO_IP_MAPPING[server[SERVER_IDENTIFIER]] # server[SERVER_PORT] = SERVERS[CURRENT_SERVER_ID][SERVER_PORT]+1 # todo: to be deleted address = (host_ip_address, server[ACCEPTING_PEER_PORT]) - pprint(f"[connect_with_all_servers] - {CURRENT_SERVER_ID} is trying to " - f"connect {(host_ip_address, server[ACCEPTING_PEER_PORT])}") + # pprint(f"[connect_with_all_servers] - {CURRENT_SERVER_ID} is trying to " + # f"connect {(host_ip_address, server[ACCEPTING_PEER_PORT])}") s.connect(address) connect_info[server_id] = s - gprint(f"[Connect] - " - f"Connecting to {server[SERVER_IDENTIFIER]}: {host_ip_address}:" - f"{server[ACCEPTING_PEER_PORT]} success!") + # gprint(f"[Connect] - " + # f"Connecting to {server[SERVER_IDENTIFIER]}: {host_ip_address}:" + # f"{server[ACCEPTING_PEER_PORT]} success!") except Exception: - rprint(f"[Connect] - Connection to " - f"{server[SERVER_IDENTIFIER]} failed. " - f"Reconnecting in {RECONNECT_TIME}s..") + # rprint(f"[Connect] - Connection to " + # f"{server[SERVER_IDENTIFIER]} failed. " + # f"Reconnecting in {RECONNECT_TIME}s..") remaining_nodes[server_id] = server servers = remaining_nodes time.sleep(RECONNECT_TIME) return_values['connect_info'] = connect_info - gprint("[Connect] - Connect to all servers success!") + # gprint("[Connect] - Connect to all servers success!") # def bind_with_all_servers(servers: dict, current_server_id: str, return_values: dict): @@ -515,7 +525,8 @@ def extract_target_server(operation): try: return operation.split(' ')[1].split('.')[0] except Exception as e: - rprint(f"[ERROR] extract_target_server error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") + pass + # rprint(f"[ERROR] extract_target_server error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") def send_msg_to_socket(msg, socket): @@ -523,7 +534,7 @@ def send_msg_to_socket(msg, socket): msg_length = len(msg) # send_length = str(msg_length).encode(ENCODING_FORMAT) msg += b' ' * (MSG_LENGTH - msg_length) - pprint(f'[send_msg_to_socket] - msg.length : [{len(msg)}], msg: [{msg.strip()}]') + # pprint(f'[send_msg_to_socket] - msg.length : [{len(msg)}], msg: [{msg.strip()}]') socket.sendall(msg) @@ -540,20 +551,20 @@ def operation_handler(operation, timestamp): def broadcast_msg(operation, connect_info): res = [] for node_id, each_socket in connect_info.items(): - gprint(f'[{CURRENT_SERVER_ID}] - Multicasting [{operation}] to [{node_id}] NOW!') + # gprint(f'[{CURRENT_SERVER_ID}] - Multicasting [{operation}] to [{node_id}] NOW!') send_msg_to_socket(operation, each_socket) - gprint(f'[{CURRENT_SERVER_ID}] - Multicasting [{operation}] to [{node_id}] DONE!') + # gprint(f'[{CURRENT_SERVER_ID}] - Multicasting [{operation}] to [{node_id}] DONE!') if 'PREPARE COMMIT' in operation: for node_id, each_socket in connect_info.items(): # gprint(f'[{CURRENT_SERVER_ID}] - Multicasting [{operation}] to [{node_id}] NOW!') # send_msg_to_socket(operation, each_socket) prepare_res = each_socket.recv(MSG_LENGTH).decode(ENCODING_FORMAT).strip() - gprint(f'[{CURRENT_SERVER_ID} PREPARE COMMIT] - RECEIVING from [{node_id}]: {prepare_res}') + # gprint(f'[{CURRENT_SERVER_ID} PREPARE COMMIT] - RECEIVING from [{node_id}]: {prepare_res}') res.append(eval(prepare_res)) # gprint(f'[{CURRENT_SERVER_ID}] - Multicasting [{operation}] to [{node_id}] DONE!') - pprint(f'[broadcast_msg] - res: {res}') + # pprint(f'[broadcast_msg] - res: {res}') return res @@ -566,23 +577,23 @@ def format_operation_and_timestamp_json(operation, timestamp): def client_handler(conn, SELF_NODE): - pprint('[client_handler] CHECKPOINT -2') + # pprint('[client_handler] CHECKPOINT -2') return_values = establish_bidirectional_connection(SERVERS, CURRENT_SERVER_ID) - pprint('[client_handler] CHECKPOINT -1') + # pprint('[client_handler] CHECKPOINT -1') connect_info = return_values.get('connect_info', {}) # bind_info = return_values.get('bind_info', {}) timestamp = time.time() - pprint('[client_handler] CHECKPOINT 0') - pprint(f'[client_handler] - current customer timestamp {timestamp}') + # pprint('[client_handler] CHECKPOINT 0') + # pprint(f'[client_handler] - current customer timestamp {timestamp}') try: while True: - pprint('---------------------------------------------------------------') - pprint('[client_handler] CHECKPOINT 1') + # pprint('---------------------------------------------------------------') + # pprint('[client_handler] CHECKPOINT 1') operation = conn.recv(MSG_LENGTH).decode(ENCODING_FORMAT).strip() - pprint('[client_handler] CHECKPOINT 2') + # pprint('[client_handler] CHECKPOINT 2') operation_json = format_operation_and_timestamp_json(operation, timestamp) - gprint(f'[client_handler] - operation [{operation}] received success!') + # gprint(f'[client_handler] - operation [{operation}] received success!') if not operation: break @@ -593,14 +604,14 @@ def client_handler(conn, SELF_NODE): target_server_connect_socket = connect_info.get(target_server_id) # target_server_bind_socket = bind_info.get(target_server_id) is_self = target_server_id == CURRENT_SERVER_ID - gprint(f'[client_handler] - operation [{operation}] SENDING TO {target_server_id}') + # gprint(f'[client_handler] - operation [{operation}] SENDING TO {target_server_id}') ret_value = None if operation.startswith('DEPOSIT') or operation.startswith('WITHDRAW'): if is_self: ret_value = tentative_write(self_node=SELF_NODE, timestamp=timestamp, rawOperation=operation) - gprint( - f'[server_handler] accounts_map: ' - f'\n{json.dumps(accounts_map, default=lambda x: x.__dict__)}') + # gprint( + # f'[server_handler] accounts_map: ' + # f'\n{json.dumps(accounts_map, default=lambda x: x.__dict__)}') else: send_msg_to_socket(operation_json, target_server_connect_socket) # ret_value = target_server_bind_socket.recv(MSG_LENGTH).decode(ENCODING_FORMAT) @@ -608,9 +619,9 @@ def client_handler(conn, SELF_NODE): elif operation.startswith('BALANCE'): if is_self: ret_value = read(self_node=SELF_NODE, timestamp=timestamp, rawOperation=operation) - gprint( - f'[server_handler] accounts_map: ' - f'\n{json.dumps(accounts_map, default=lambda x: x.__dict__)}') + # gprint( + # f'[server_handler] accounts_map: ' + # f'\n{json.dumps(accounts_map, default=lambda x: x.__dict__)}') else: send_msg_to_socket(operation_json, target_server_connect_socket) # ret_value = target_server_bind_socket.recv(MSG_LENGTH).decode(ENCODING_FORMAT) @@ -630,16 +641,16 @@ def client_handler(conn, SELF_NODE): broadcast_msg(operation_json, connect_info) commit_ok_transaction(timestamp, SELF_NODE) ret_value = 'COMMIT OK' # todo - gprint(f'[server_handler] accounts_map: \n' - f'{json.dumps(accounts_map, default=lambda x: x.__dict__)}') + # gprint(f'[server_handler] accounts_map: \n' + # f'{json.dumps(accounts_map, default=lambda x: x.__dict__)}') elif operation.startswith('ABORT'): # __import__('pudb').set_trace() # broadcast_msg(operation_json, connect_info) # abort(SELF_NODE, timestamp) ret_value = 'ABORTED' # todo - gprint(f'[server_handler] accounts_map: ' - f'\n{json.dumps(accounts_map, default=lambda x: x.__dict__)}') + # gprint(f'[server_handler] accounts_map: ' + # f'\n{json.dumps(accounts_map, default=lambda x: x.__dict__)}') ret_value = reformat_return_value(ret_value, operation) if 'ABORT' in ret_value: @@ -648,9 +659,10 @@ def client_handler(conn, SELF_NODE): send_msg_to_socket(ret_value, conn) except Exception as e: - rprint(f"[ERROR] client_handler error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") + pass + # rprint(f"[ERROR] client_handler error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") finally: - pprint("[client_handler] CONNECTION CLOSED...") + # pprint("[client_handler] CONNECTION CLOSED...") conn.close() # client_recv_socket.close() # client_send_socket.close() @@ -676,10 +688,18 @@ def reformat_return_value(ret_value, operation): raise TypeError(f'ret_value should be None or a bool, not {ret_value.__class__.__name__}') +def print_all_available_accounts_balance(): + balances_str = ', '.join('{}:{}'.format(k, v.balance) if v.balance != 0 + else '' for k, v in sorted(accounts_map.items())) + if balances_str: + print('BALANCES', end=' ') + print(balances_str) + + def server_handler(conn, SELF_NODE): try: while True: - pprint('---------------------------------------------------------------') + # pprint('---------------------------------------------------------------') operation_dict = json.loads(conn.recv(MSG_LENGTH).decode(ENCODING_FORMAT).strip()) operation = operation_dict.get('operation', '') timestamp = operation_dict.get('timestamp', '') @@ -692,17 +712,19 @@ def server_handler(conn, SELF_NODE): ret_value = commit_check_transaction(timestamp, SELF_NODE) elif operation.startswith('COMMIT'): ret_value = commit_ok_transaction(timestamp, SELF_NODE) + # print_all_available_accounts_balance() elif operation.startswith('ABORT'): abort(SELF_NODE, timestamp) - gprint(f'[server_handler] accounts_map: \n{json.dumps(accounts_map, default=lambda x: x.__dict__)}') + # gprint(f'[server_handler] accounts_map: \n{json.dumps(accounts_map, default=lambda x: x.__dict__)}') break ret_value = reformat_return_value(ret_value, operation) send_msg_to_socket(ret_value, conn) - gprint(f'[server_handler] accounts_map: \n{json.dumps(accounts_map, default=lambda x: x.__dict__)}') + # gprint(f'[server_handler] accounts_map: \n{json.dumps(accounts_map, default=lambda x: x.__dict__)}') except Exception as e: - rprint(f"[ERROR] server_handler error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") + pass + # rprint(f"[ERROR] server_handler error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") finally: - pprint('[server_handler] - ABORTED - connection with the current client CLOSED!') + # pprint('[server_handler] - ABORTED - connection with the current client CLOSED!') conn.close() # peer_server_send_socket.close() @@ -716,7 +738,7 @@ def accepting_clients(SELF_NODE): # print(f"[LISTENING] Server is listening on {ADDR}") # DEBUG while True: conn, addr = server.accept() - gprint(f'[accepting_clients] - client [{addr}] connect to coordinator [{CURRENT_SERVER_ID}] success!') + # gprint(f'[accepting_clients] - client [{addr}] connect to coordinator [{CURRENT_SERVER_ID}] success!') # client_send_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # client_send_socket.connect((addr[0], 5678)) # gprint(f'[accepting_clients] - coordinator [{CURRENT_SERVER_ID}] connect to client [({addr[0]}, {5678})] success!') @@ -724,7 +746,8 @@ def accepting_clients(SELF_NODE): thread = threading.Thread(target=client_handler, args=[conn, SELF_NODE]) thread.start() except Exception as e: - rprint(f"[ERROR] accepting_clients error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") + pass + # rprint(f"[ERROR] accepting_clients error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") finally: server.close() @@ -733,19 +756,20 @@ def accepting_peer_servers(SELF_NODE): try: server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind((CURRENT_SERVER_IP, SERVERS[CURRENT_SERVER_ID][ACCEPTING_PEER_PORT])) - pprint(f'[accepting_peer_servers] - {CURRENT_SERVER_ID} current ' - f'bind at {(CURRENT_SERVER_IP, SERVERS[CURRENT_SERVER_ID][ACCEPTING_PEER_PORT])}') + # pprint(f'[accepting_peer_servers] - {CURRENT_SERVER_ID} current ' + # f'bind at {(CURRENT_SERVER_IP, SERVERS[CURRENT_SERVER_ID][ACCEPTING_PEER_PORT])}') server.listen() # print(f"[LISTENING] Server is listening on {ADDR}") # DEBUG while True: conn, addr = server.accept() - gprint(f'[accepting_peer_servers] - peer server [{addr}] connect to server [{CURRENT_SERVER_ID}] success!') + # gprint(f'[accepting_peer_servers] - peer server [{addr}] connect to server [{CURRENT_SERVER_ID}] success!') # client_send_socket = server.connect(addr) # gprint(f'[accepting_peer_servers] - server [{CURRENT_SERVER_ID}] connect to peer server [{addr}] success!') thread = threading.Thread(target=server_handler, args=[conn, SELF_NODE]) thread.start() except Exception as e: - rprint(f"[ERROR] accepting_peer_servers error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") + pass + # rprint(f"[ERROR] accepting_peer_servers error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") finally: server.close() diff --git a/sample_test/src/client.py b/sample_test/src/client.py index 6bef4dadc0142ff6c84bfd0f51f707495c2c8b31..c4a159cb17be3b07d54db4340626eb382a72e176 100644 --- a/sample_test/src/client.py +++ b/sample_test/src/client.py @@ -1,6 +1,6 @@ import random from constants import SERVER_IDENTIFIER, SERVER_ADDRESS, SERVER_PORT, ENCODING_FORMAT, MSG_LENGTH -from utils import gprint, pprint +# from utils import gprint, pprint import os import sys import socket @@ -37,9 +37,9 @@ def connect(coordinator_info: tuple): coordinator_port_number = coordinator_info[1][SERVER_PORT] client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) address = (coordinator_ip_address, coordinator_port_number) - pprint(f'[connect] - client connecting to address {address}') + # pprint(f'[connect] - client connecting to address {address}') client.connect(address) - pprint(f'[connect] - client connecting to address {address} success!') + # pprint(f'[connect] - client connecting to address {address} success!') return client @@ -48,17 +48,17 @@ def bind(): server.bind(ADDR) server.listen() conn, addr = server.accept() - gprint(f"[Bind] - Bind to {addr} success!") + # gprint(f"[Bind] - Bind to {addr} success!") return conn def send_operation(operation_str, socket): - msg = operation_str.encode(ENCODING_FORMAT) + msg = operation_str.encode(ENCODING_FORMAT) # todo: upper case msg_length = len(msg) # send_length = str(msg_length).encode(ENCODING_FORMAT) msg += b' ' * (MSG_LENGTH - msg_length) # pprint(f'[send_operation] - {locals()}') - pprint(f'[send_operation] - msg.length : {len(msg)}') + # pprint(f'[send_operation] - msg.length : {len(msg)}') socket.sendall(msg) @@ -77,10 +77,10 @@ if __name__ == '__main__': coordinator_info = pick_coordinator(servers) client_to_coordinator_socket = connect(coordinator_info) - pprint('[client_to_coordinator_socket] CONNECT SUCCESS!') + # pprint('[client_to_coordinator_socket] CONNECT SUCCESS!') # coordinator_to_client_socket = bind() # pprint('[bind] BIND SUCCESS!') - print(coordinator_info) + # print(coordinator_info) try: while True: operation = input() @@ -95,9 +95,9 @@ if __name__ == '__main__': break # todo: add end condition except EOFError: - gprint("[Client] - All commands in .txt file executed. Client Exits..") + # gprint("[Client] - All commands in .txt file executed. Client Exits..") os._exit(0) finally: client_to_coordinator_socket.close() - gprint("[Client] - client_to_coordinator_socket CLOSED..") + # gprint("[Client] - client_to_coordinator_socket CLOSED..") # coordinator_to_client_socket.close() diff --git a/sample_test/src/config.txt b/sample_test/src/config.txt index 25c842b6f44715759f5edc72a3ba1c0694865000..37673ba2e3345597265fb75d8eca422cffbaa73b 100644 --- a/sample_test/src/config.txt +++ b/sample_test/src/config.txt @@ -1,5 +1,5 @@ -A 127.0.0.1 10001 -B 127.0.0.1 10002 -C 127.0.0.1 10003 -D 127.0.0.1 10004 -E 127.0.0.1 10005 \ No newline at end of file +A sp22-cs425-g68-01.cs.illinois.edu 1234 +B sp22-cs425-g68-02.cs.illinois.edu 1234 +C sp22-cs425-g68-03.cs.illinois.edu 1234 +D sp22-cs425-g68-04.cs.illinois.edu 1234 +E sp22-cs425-g68-05.cs.illinois.edu 1234 diff --git a/sample_test/src/server.py b/sample_test/src/server.py index cd78eee06ad27c4ae83300eda067d7a26a1717f7..4fb23369da8b5129b68261447aaafbd3a25b8827 100644 --- a/sample_test/src/server.py +++ b/sample_test/src/server.py @@ -3,7 +3,8 @@ import json import sys import time import traceback -from utils import gprint, rprint, pprint +from utils import rprint +# from utils import gprint, rprint, pprint from constants import SERVER_IDENTIFIER, SERVER_ADDRESS, SERVER_PORT, RECONNECT_TIME, MSG_LENGTH, ENCODING_FORMAT, \ ACCEPTING_PEER_PORT import socket @@ -43,8 +44,9 @@ def commit_ok_transaction(timestamp: str, self_node: SelfNode): if not self_node.lock.locked(): self_node.lock.acquire() try: - pprint(f'[commit_ok_transaction] - {timestamp} - ENTER CHECKPOINT') + # pprint(f'[commit_ok_transaction] - {timestamp} - ENTER CHECKPOINT') # timestamp 是肯定å¯ä»¥commit + has_account_modified = False for cur_account in accounts_map.values(): # åˆ é™¤è¿™ä¸ªaccountRTS 䏿‰€æœ‰çš„timestamp cur_account.RTS = list(filter(lambda x: x != timestamp, cur_account.RTS)) @@ -58,10 +60,14 @@ def commit_ok_transaction(timestamp: str, self_node: SelfNode): # __import__('pudb').set_trace() cur_account.balance += cur_account.TW[timestamp].operation_value cur_account.TW.pop(timestamp) - break + has_account_modified = True + # break # self_node.lock.release() - pprint(f'[commit_ok_transaction] - {timestamp} - EXIT CHECKPOINT') + # pprint(f'[commit_ok_transaction] - {timestamp} - EXIT CHECKPOINT') + if has_account_modified: + print_all_available_accounts_balance() except Exception as e: + # pass rprint(f"[ERROR] commit_ok_transaction error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") finally: if self_node.lock.locked(): @@ -71,7 +77,7 @@ def commit_ok_transaction(timestamp: str, self_node: SelfNode): # 收到commit 的消æ¯ï¼Œè¦åŽ»check def commit_check_transaction(timestamp: str, self_node: SelfNode) -> bool: - pprint(f'[commit_check_transaction] - {timestamp} - ENTER CHECKPOINT') + # pprint(f'[commit_check_transaction] - {timestamp} - ENTER CHECKPOINT') if not self_node.lock.locked(): self_node.lock.acquire() try: @@ -91,7 +97,7 @@ def commit_check_transaction(timestamp: str, self_node: SelfNode) -> bool: if timestamp in TW.keys(): flag = True while flag: - pprint('[commit_check_transaction] - sleeping check 1...') + # pprint('[commit_check_transaction] - sleeping check 1...') min_write_timestamp = timestamp for temp in TW.keys(): if temp < min_write_timestamp: @@ -107,12 +113,12 @@ def commit_check_transaction(timestamp: str, self_node: SelfNode) -> bool: flag = True while flag: # __import__('pudb').set_trace() - pprint('[commit_check_transaction] - sleeping check 2...') + # pprint('[commit_check_transaction] - sleeping check 2...') # 这个时候还是有é”çš„ï¼Œå› ä¸ºæ˜¯ä»Žflag == false过æ¥çš„ # min_read_timestamp = min(RTS) # min_read_timestamp = min(list(filter(lambda x: x != 0, RTS))) # DELETE 0 IN THIS CASE # __import__('pudb').set_trace() - pprint(f'[commit_check_transaction] - accounts_map[_key].RTS: {accounts_map[_key].RTS}') + # pprint(f'[commit_check_transaction] - accounts_map[_key].RTS: {accounts_map[_key].RTS}') min_read_timestamp = min(list(filter(lambda x: x != 0, accounts_map[_key].RTS))) if min_read_timestamp == timestamp: flag = False @@ -136,13 +142,14 @@ def commit_check_transaction(timestamp: str, self_node: SelfNode) -> bool: continue # self_node.lock.release() - pprint(f'[commit_check_transaction] - {timestamp} - EXIT CHECKPOINT') + # pprint(f'[commit_check_transaction] - {timestamp} - EXIT CHECKPOINT') # 如果transaction和这个server䏿‰€æœ‰çš„account都没有关系的è¯, 那么还是返回true # 如果说有关系, é‚£è¿åconsistency, 那就ä¸è¡Œ # 如果ä¸è¿å, åªæ˜¯å‰é¢çš„transaction没有commit, 那就是ç‰ç€ return commit_ok except Exception as e: - rprint(f"[ERROR] commit_check_transaction error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") + pass + # rprint(f"[ERROR] commit_check_transaction error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") finally: if self_node.lock.locked(): self_node.lock.release() @@ -155,7 +162,7 @@ def tentative_write(self_node: SelfNode, timestamp: str, rawOperation: str): self_node.lock.acquire() try: - pprint(f'[tentative_write] - {timestamp} - {rawOperation} - ENTER CHECKPOINT') + # pprint(f'[tentative_write] - {timestamp} - {rawOperation} - ENTER CHECKPOINT') # __import__('pudb').set_trace() # raw operiation # deposit å’Œwithdraw还ä¸ä¸€æ · account_name = rawOperation.split()[1] # DEPOSIT A.foo 50 @@ -173,25 +180,25 @@ def tentative_write(self_node: SelfNode, timestamp: str, rawOperation: str): # æ›´æ–°TW cur_account.TW[timestamp].operation_value += operation_value # self_node.lock.release() - pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 1') + # pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 1') return True else: new_tentative_write = WriteEntry(timestamp, operation_value) cur_account.TW[timestamp] = new_tentative_write # !!!用TW的时候需è¦å¯¹TW è¿›è¡ŒæŽ’åº # self_node.lock.release() - pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 2') + # pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 2') return True else: self_node.lock.release() - pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 3') + # pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 3') return None # 需è¦abort else: # 如果deposit 一个没有å˜åœ¨çš„账户,是å¯ä»¥çš„ new_account = Account(account_name, timestamp) new_account.TW[timestamp] = WriteEntry(timestamp, operation_value) accounts_map[account_name] = new_account # todo: to be checked with Yan # self_node.lock.release() - pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4') + # pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4') return True # else: # == "WITHDRAW" 是read å’Œwriteçš„ç»“åˆ @@ -200,20 +207,21 @@ def tentative_write(self_node: SelfNode, timestamp: str, rawOperation: str): read_result = read(self_node, timestamp, readOperation) if read_result is None: - pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.5') + # pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.5') return None elif read_result == 'NOT FOUND, ABORTED': # 需è¦abort了 并且是 NOT FOUND, ABORTED # self_node.lock.release() - pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 5') + # pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 5') return 'NOT FOUND, ABORTED' else: # è¿™é‡Œé»˜è®¤ä¼ å…¥çš„operation_value 是有æ£è´Ÿä¹‹åˆ†çš„ readOperation = "DEPOSIT" + " " + account_name + " " + str(operation_value * -1) - pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 6') + # pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 6') return tentative_write(self_node, timestamp, readOperation) # todo: one more params? except Exception as e: - rprint(f"[ERROR] tentative_write error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") + pass + # rprint(f"[ERROR] tentative_write error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") finally: if self_node.lock.locked(): self_node.lock.release() @@ -252,7 +260,7 @@ def tentative_write(self_node: SelfNode, timestamp: str, rawOperation: str): # BALANCE COME HERE def read(self_node: SelfNode, timestamp: str, rawOperation: str): - pprint(f'[read] - {timestamp} - {rawOperation} - ENTER CHECKPOINT') + # pprint(f'[read] - {timestamp} - {rawOperation} - ENTER CHECKPOINT') if not self_node.lock.locked(): self_node.lock.acquire() @@ -263,13 +271,13 @@ def read(self_node: SelfNode, timestamp: str, rawOperation: str): # self_node.lock.acquire() if account_name not in accounts_map.keys(): # self_node.lock.release() - pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 1') + # pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 1') return 'NOT FOUND, ABORTED' # 表示没有读到 需è¦abort while flag: # self_node.lock.acquire() if account_name not in accounts_map.keys(): # self_node.lock.release() - pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 2') + # pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 2') return 'NOT FOUND, ABORTED' # 表示没有读到 需è¦abort cur_account: Account = accounts_map[account_name] @@ -315,10 +323,10 @@ def read(self_node: SelfNode, timestamp: str, rawOperation: str): if timestamp in cur_account.TW.keys(): new_temp_balance_this_timestamp = cur_balance + cur_account.TW[timestamp].operation_value # self_node.lock.release() - pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.4.1') + # pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.4.1') return new_temp_balance_this_timestamp else: - pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.4.2') + # pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.4.2') return cur_balance cur_account.RTS.append(timestamp) @@ -326,10 +334,10 @@ def read(self_node: SelfNode, timestamp: str, rawOperation: str): if timestamp in cur_account.TW.keys(): new_temp_balance_this_timestamp = cur_balance + cur_account.TW[timestamp].operation_value # self_node.lock.release() - pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.5.1') + # pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.5.1') return new_temp_balance_this_timestamp else: - pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.5.2') + # pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.5.2') return cur_balance else: if Ds_written_by == timestamp: @@ -338,24 +346,25 @@ def read(self_node: SelfNode, timestamp: str, rawOperation: str): new_temp_balance_this_timestamp = cur_balance + cur_account.TW[timestamp].operation_value flag = False # self_node.lock.release() - pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 5.1') + # pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 5.1') return new_temp_balance_this_timestamp # ä¼ å›ž 应该有的值, ä¼ å›žcoordinator的在其他函数写 else: - pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 5.2') + # pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 5.2') return cur_balance else: - pprint(f'[read] - {timestamp} - {rawOperation} - SLEEPING...') + # pprint(f'[read] - {timestamp} - {rawOperation} - SLEEPING...') self_node.lock.release() time.sleep(0.01) self_node.lock.acquire() else: # abort(没写), ä¼ è¾“ç»™coordinator 没写 # self_node.lock.release() - pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 6') + # pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 6') return None except Exception as e: - rprint(f"[ERROR] read error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") + pass + # rprint(f"[ERROR] read error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") finally: if self_node.lock.locked(): self_node.lock.release() @@ -363,7 +372,7 @@ def read(self_node: SelfNode, timestamp: str, rawOperation: str): # abort 的本身 def abort(self_node: SelfNode, timestamp: str): - pprint(f'[abort] - {timestamp} - ENTER CHECKPOINT') + # pprint(f'[abort] - {timestamp} - ENTER CHECKPOINT') # #对当å‰server 所有的account çš„TW å’ŒRTS ä¸è¿›è¡ŒæŸ¥æ‰¾çœ‹çœ‹è¿™ä¸ªtimestamp 的东西 # self_node.lock.acquire() @@ -402,9 +411,10 @@ def abort(self_node: SelfNode, timestamp: str): if timestamp in list(cur_account.TW.keys()): del cur_account.TW[timestamp] # self_node.lock.release() - pprint(f'[abort] - {timestamp} - EXIT CHECKPOINT') + # pprint(f'[abort] - {timestamp} - EXIT CHECKPOINT') except Exception as e: - rprint(f"[ERROR] abort error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") + pass + # rprint(f"[ERROR] abort error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") finally: self_node.lock.release() @@ -462,22 +472,22 @@ def connect_with_all_servers(servers: dict, current_server_id: str, return_value host_ip_address = SERVER_ID_TO_IP_MAPPING[server[SERVER_IDENTIFIER]] # server[SERVER_PORT] = SERVERS[CURRENT_SERVER_ID][SERVER_PORT]+1 # todo: to be deleted address = (host_ip_address, server[ACCEPTING_PEER_PORT]) - pprint(f"[connect_with_all_servers] - {CURRENT_SERVER_ID} is trying to " - f"connect {(host_ip_address, server[ACCEPTING_PEER_PORT])}") + # pprint(f"[connect_with_all_servers] - {CURRENT_SERVER_ID} is trying to " + # f"connect {(host_ip_address, server[ACCEPTING_PEER_PORT])}") s.connect(address) connect_info[server_id] = s - gprint(f"[Connect] - " - f"Connecting to {server[SERVER_IDENTIFIER]}: {host_ip_address}:" - f"{server[ACCEPTING_PEER_PORT]} success!") + # gprint(f"[Connect] - " + # f"Connecting to {server[SERVER_IDENTIFIER]}: {host_ip_address}:" + # f"{server[ACCEPTING_PEER_PORT]} success!") except Exception: - rprint(f"[Connect] - Connection to " - f"{server[SERVER_IDENTIFIER]} failed. " - f"Reconnecting in {RECONNECT_TIME}s..") + # rprint(f"[Connect] - Connection to " + # f"{server[SERVER_IDENTIFIER]} failed. " + # f"Reconnecting in {RECONNECT_TIME}s..") remaining_nodes[server_id] = server servers = remaining_nodes time.sleep(RECONNECT_TIME) return_values['connect_info'] = connect_info - gprint("[Connect] - Connect to all servers success!") + # gprint("[Connect] - Connect to all servers success!") # def bind_with_all_servers(servers: dict, current_server_id: str, return_values: dict): @@ -515,7 +525,8 @@ def extract_target_server(operation): try: return operation.split(' ')[1].split('.')[0] except Exception as e: - rprint(f"[ERROR] extract_target_server error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") + pass + # rprint(f"[ERROR] extract_target_server error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") def send_msg_to_socket(msg, socket): @@ -523,7 +534,7 @@ def send_msg_to_socket(msg, socket): msg_length = len(msg) # send_length = str(msg_length).encode(ENCODING_FORMAT) msg += b' ' * (MSG_LENGTH - msg_length) - pprint(f'[send_msg_to_socket] - msg.length : [{len(msg)}], msg: [{msg.strip()}]') + # pprint(f'[send_msg_to_socket] - msg.length : [{len(msg)}], msg: [{msg.strip()}]') socket.sendall(msg) @@ -540,20 +551,20 @@ def operation_handler(operation, timestamp): def broadcast_msg(operation, connect_info): res = [] for node_id, each_socket in connect_info.items(): - gprint(f'[{CURRENT_SERVER_ID}] - Multicasting [{operation}] to [{node_id}] NOW!') + # gprint(f'[{CURRENT_SERVER_ID}] - Multicasting [{operation}] to [{node_id}] NOW!') send_msg_to_socket(operation, each_socket) - gprint(f'[{CURRENT_SERVER_ID}] - Multicasting [{operation}] to [{node_id}] DONE!') + # gprint(f'[{CURRENT_SERVER_ID}] - Multicasting [{operation}] to [{node_id}] DONE!') if 'PREPARE COMMIT' in operation: for node_id, each_socket in connect_info.items(): # gprint(f'[{CURRENT_SERVER_ID}] - Multicasting [{operation}] to [{node_id}] NOW!') # send_msg_to_socket(operation, each_socket) prepare_res = each_socket.recv(MSG_LENGTH).decode(ENCODING_FORMAT).strip() - gprint(f'[{CURRENT_SERVER_ID} PREPARE COMMIT] - RECEIVING from [{node_id}]: {prepare_res}') + # gprint(f'[{CURRENT_SERVER_ID} PREPARE COMMIT] - RECEIVING from [{node_id}]: {prepare_res}') res.append(eval(prepare_res)) # gprint(f'[{CURRENT_SERVER_ID}] - Multicasting [{operation}] to [{node_id}] DONE!') - pprint(f'[broadcast_msg] - res: {res}') + # pprint(f'[broadcast_msg] - res: {res}') return res @@ -566,23 +577,23 @@ def format_operation_and_timestamp_json(operation, timestamp): def client_handler(conn, SELF_NODE): - pprint('[client_handler] CHECKPOINT -2') + # pprint('[client_handler] CHECKPOINT -2') return_values = establish_bidirectional_connection(SERVERS, CURRENT_SERVER_ID) - pprint('[client_handler] CHECKPOINT -1') + # pprint('[client_handler] CHECKPOINT -1') connect_info = return_values.get('connect_info', {}) # bind_info = return_values.get('bind_info', {}) timestamp = time.time() - pprint('[client_handler] CHECKPOINT 0') - pprint(f'[client_handler] - current customer timestamp {timestamp}') + # pprint('[client_handler] CHECKPOINT 0') + # pprint(f'[client_handler] - current customer timestamp {timestamp}') try: while True: - pprint('---------------------------------------------------------------') - pprint('[client_handler] CHECKPOINT 1') + # pprint('---------------------------------------------------------------') + # pprint('[client_handler] CHECKPOINT 1') operation = conn.recv(MSG_LENGTH).decode(ENCODING_FORMAT).strip() - pprint('[client_handler] CHECKPOINT 2') + # pprint('[client_handler] CHECKPOINT 2') operation_json = format_operation_and_timestamp_json(operation, timestamp) - gprint(f'[client_handler] - operation [{operation}] received success!') + # gprint(f'[client_handler] - operation [{operation}] received success!') if not operation: break @@ -593,14 +604,14 @@ def client_handler(conn, SELF_NODE): target_server_connect_socket = connect_info.get(target_server_id) # target_server_bind_socket = bind_info.get(target_server_id) is_self = target_server_id == CURRENT_SERVER_ID - gprint(f'[client_handler] - operation [{operation}] SENDING TO {target_server_id}') + # gprint(f'[client_handler] - operation [{operation}] SENDING TO {target_server_id}') ret_value = None if operation.startswith('DEPOSIT') or operation.startswith('WITHDRAW'): if is_self: ret_value = tentative_write(self_node=SELF_NODE, timestamp=timestamp, rawOperation=operation) - gprint( - f'[server_handler] accounts_map: ' - f'\n{json.dumps(accounts_map, default=lambda x: x.__dict__)}') + # gprint( + # f'[server_handler] accounts_map: ' + # f'\n{json.dumps(accounts_map, default=lambda x: x.__dict__)}') else: send_msg_to_socket(operation_json, target_server_connect_socket) # ret_value = target_server_bind_socket.recv(MSG_LENGTH).decode(ENCODING_FORMAT) @@ -608,9 +619,9 @@ def client_handler(conn, SELF_NODE): elif operation.startswith('BALANCE'): if is_self: ret_value = read(self_node=SELF_NODE, timestamp=timestamp, rawOperation=operation) - gprint( - f'[server_handler] accounts_map: ' - f'\n{json.dumps(accounts_map, default=lambda x: x.__dict__)}') + # gprint( + # f'[server_handler] accounts_map: ' + # f'\n{json.dumps(accounts_map, default=lambda x: x.__dict__)}') else: send_msg_to_socket(operation_json, target_server_connect_socket) # ret_value = target_server_bind_socket.recv(MSG_LENGTH).decode(ENCODING_FORMAT) @@ -630,16 +641,16 @@ def client_handler(conn, SELF_NODE): broadcast_msg(operation_json, connect_info) commit_ok_transaction(timestamp, SELF_NODE) ret_value = 'COMMIT OK' # todo - gprint(f'[server_handler] accounts_map: \n' - f'{json.dumps(accounts_map, default=lambda x: x.__dict__)}') + # gprint(f'[server_handler] accounts_map: \n' + # f'{json.dumps(accounts_map, default=lambda x: x.__dict__)}') elif operation.startswith('ABORT'): # __import__('pudb').set_trace() # broadcast_msg(operation_json, connect_info) # abort(SELF_NODE, timestamp) ret_value = 'ABORTED' # todo - gprint(f'[server_handler] accounts_map: ' - f'\n{json.dumps(accounts_map, default=lambda x: x.__dict__)}') + # gprint(f'[server_handler] accounts_map: ' + # f'\n{json.dumps(accounts_map, default=lambda x: x.__dict__)}') ret_value = reformat_return_value(ret_value, operation) if 'ABORT' in ret_value: @@ -648,9 +659,10 @@ def client_handler(conn, SELF_NODE): send_msg_to_socket(ret_value, conn) except Exception as e: - rprint(f"[ERROR] client_handler error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") + pass + # rprint(f"[ERROR] client_handler error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") finally: - pprint("[client_handler] CONNECTION CLOSED...") + # pprint("[client_handler] CONNECTION CLOSED...") conn.close() # client_recv_socket.close() # client_send_socket.close() @@ -676,10 +688,18 @@ def reformat_return_value(ret_value, operation): raise TypeError(f'ret_value should be None or a bool, not {ret_value.__class__.__name__}') +def print_all_available_accounts_balance(): + balances_str = ', '.join('{}:{}'.format(k, v.balance) if v.balance != 0 + else '' for k, v in sorted(accounts_map.items())) + if balances_str: + print('BALANCES', end=' ') + print(balances_str) + + def server_handler(conn, SELF_NODE): try: while True: - pprint('---------------------------------------------------------------') + # pprint('---------------------------------------------------------------') operation_dict = json.loads(conn.recv(MSG_LENGTH).decode(ENCODING_FORMAT).strip()) operation = operation_dict.get('operation', '') timestamp = operation_dict.get('timestamp', '') @@ -692,17 +712,19 @@ def server_handler(conn, SELF_NODE): ret_value = commit_check_transaction(timestamp, SELF_NODE) elif operation.startswith('COMMIT'): ret_value = commit_ok_transaction(timestamp, SELF_NODE) + # print_all_available_accounts_balance() elif operation.startswith('ABORT'): abort(SELF_NODE, timestamp) - gprint(f'[server_handler] accounts_map: \n{json.dumps(accounts_map, default=lambda x: x.__dict__)}') + # gprint(f'[server_handler] accounts_map: \n{json.dumps(accounts_map, default=lambda x: x.__dict__)}') break ret_value = reformat_return_value(ret_value, operation) send_msg_to_socket(ret_value, conn) - gprint(f'[server_handler] accounts_map: \n{json.dumps(accounts_map, default=lambda x: x.__dict__)}') + # gprint(f'[server_handler] accounts_map: \n{json.dumps(accounts_map, default=lambda x: x.__dict__)}') except Exception as e: - rprint(f"[ERROR] server_handler error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") + pass + # rprint(f"[ERROR] server_handler error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") finally: - pprint('[server_handler] - ABORTED - connection with the current client CLOSED!') + # pprint('[server_handler] - ABORTED - connection with the current client CLOSED!') conn.close() # peer_server_send_socket.close() @@ -716,7 +738,7 @@ def accepting_clients(SELF_NODE): # print(f"[LISTENING] Server is listening on {ADDR}") # DEBUG while True: conn, addr = server.accept() - gprint(f'[accepting_clients] - client [{addr}] connect to coordinator [{CURRENT_SERVER_ID}] success!') + # gprint(f'[accepting_clients] - client [{addr}] connect to coordinator [{CURRENT_SERVER_ID}] success!') # client_send_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # client_send_socket.connect((addr[0], 5678)) # gprint(f'[accepting_clients] - coordinator [{CURRENT_SERVER_ID}] connect to client [({addr[0]}, {5678})] success!') @@ -724,7 +746,8 @@ def accepting_clients(SELF_NODE): thread = threading.Thread(target=client_handler, args=[conn, SELF_NODE]) thread.start() except Exception as e: - rprint(f"[ERROR] accepting_clients error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") + pass + # rprint(f"[ERROR] accepting_clients error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") finally: server.close() @@ -733,19 +756,20 @@ def accepting_peer_servers(SELF_NODE): try: server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind((CURRENT_SERVER_IP, SERVERS[CURRENT_SERVER_ID][ACCEPTING_PEER_PORT])) - pprint(f'[accepting_peer_servers] - {CURRENT_SERVER_ID} current ' - f'bind at {(CURRENT_SERVER_IP, SERVERS[CURRENT_SERVER_ID][ACCEPTING_PEER_PORT])}') + # pprint(f'[accepting_peer_servers] - {CURRENT_SERVER_ID} current ' + # f'bind at {(CURRENT_SERVER_IP, SERVERS[CURRENT_SERVER_ID][ACCEPTING_PEER_PORT])}') server.listen() # print(f"[LISTENING] Server is listening on {ADDR}") # DEBUG while True: conn, addr = server.accept() - gprint(f'[accepting_peer_servers] - peer server [{addr}] connect to server [{CURRENT_SERVER_ID}] success!') + # gprint(f'[accepting_peer_servers] - peer server [{addr}] connect to server [{CURRENT_SERVER_ID}] success!') # client_send_socket = server.connect(addr) # gprint(f'[accepting_peer_servers] - server [{CURRENT_SERVER_ID}] connect to peer server [{addr}] success!') thread = threading.Thread(target=server_handler, args=[conn, SELF_NODE]) thread.start() except Exception as e: - rprint(f"[ERROR] accepting_peer_servers error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") + pass + # rprint(f"[ERROR] accepting_peer_servers error:{e.__class__.__name__}.{e}.{traceback.format_exc()}") finally: server.close()