Skip to content
Snippets Groups Projects
Commit 3cc1606d authored by hangxie2's avatar hangxie2
Browse files

MP3 CAMERA READY

parent 354b0bdc
No related branches found
No related tags found
1 merge request!11MP3 CAMERA READY
......@@ -8,4 +8,6 @@ exclude =
./sample_test/src/*.py
per-file-ignores =
./mp1/node.py:W291
./mp3/server.py:F841
import random
from constants import SERVER_IDENTIFIER, SERVER_ADDRESS, SERVER_PORT, ENCODING_FORMAT, MSG_LENGTH
from utils import gprint, pprint
# from utils import gprint, pprint
import os
import sys
import socket
......@@ -37,9 +37,9 @@ def connect(coordinator_info: tuple):
coordinator_port_number = coordinator_info[1][SERVER_PORT]
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
address = (coordinator_ip_address, coordinator_port_number)
pprint(f'[connect] - client connecting to address {address}')
# pprint(f'[connect] - client connecting to address {address}')
client.connect(address)
pprint(f'[connect] - client connecting to address {address} success!')
# pprint(f'[connect] - client connecting to address {address} success!')
return client
......@@ -48,17 +48,17 @@ def bind():
server.bind(ADDR)
server.listen()
conn, addr = server.accept()
gprint(f"[Bind] - Bind to {addr} success!")
# gprint(f"[Bind] - Bind to {addr} success!")
return conn
def send_operation(operation_str, socket):
msg = operation_str.upper().encode(ENCODING_FORMAT) # todo: upper case
msg = operation_str.encode(ENCODING_FORMAT) # todo: upper case
msg_length = len(msg)
# send_length = str(msg_length).encode(ENCODING_FORMAT)
msg += b' ' * (MSG_LENGTH - msg_length)
# pprint(f'[send_operation] - {locals()}')
pprint(f'[send_operation] - msg.length : {len(msg)}')
# pprint(f'[send_operation] - msg.length : {len(msg)}')
socket.sendall(msg)
......@@ -77,10 +77,10 @@ if __name__ == '__main__':
coordinator_info = pick_coordinator(servers)
client_to_coordinator_socket = connect(coordinator_info)
pprint('[client_to_coordinator_socket] CONNECT SUCCESS!')
# pprint('[client_to_coordinator_socket] CONNECT SUCCESS!')
# coordinator_to_client_socket = bind()
# pprint('[bind] BIND SUCCESS!')
print(coordinator_info)
# print(coordinator_info)
try:
while True:
operation = input()
......@@ -95,9 +95,9 @@ if __name__ == '__main__':
break
# todo: add end condition
except EOFError:
gprint("[Client] - All commands in .txt file executed. Client Exits..")
# gprint("[Client] - All commands in .txt file executed. Client Exits..")
os._exit(0)
finally:
client_to_coordinator_socket.close()
gprint("[Client] - client_to_coordinator_socket CLOSED..")
# gprint("[Client] - client_to_coordinator_socket CLOSED..")
# coordinator_to_client_socket.close()
......@@ -3,7 +3,8 @@ import json
import sys
import time
import traceback
from utils import gprint, rprint, pprint
from utils import rprint
# from utils import gprint, rprint, pprint
from constants import SERVER_IDENTIFIER, SERVER_ADDRESS, SERVER_PORT, RECONNECT_TIME, MSG_LENGTH, ENCODING_FORMAT, \
ACCEPTING_PEER_PORT
import socket
......@@ -43,8 +44,9 @@ def commit_ok_transaction(timestamp: str, self_node: SelfNode):
if not self_node.lock.locked():
self_node.lock.acquire()
try:
pprint(f'[commit_ok_transaction] - {timestamp} - ENTER CHECKPOINT')
# pprint(f'[commit_ok_transaction] - {timestamp} - ENTER CHECKPOINT')
# timestamp 是肯定可以commit
has_account_modified = False
for cur_account in accounts_map.values():
# 删除这个accountRTS 中所有的timestamp
cur_account.RTS = list(filter(lambda x: x != timestamp, cur_account.RTS))
......@@ -58,10 +60,14 @@ def commit_ok_transaction(timestamp: str, self_node: SelfNode):
# __import__('pudb').set_trace()
cur_account.balance += cur_account.TW[timestamp].operation_value
cur_account.TW.pop(timestamp)
break
has_account_modified = True
# break
# self_node.lock.release()
pprint(f'[commit_ok_transaction] - {timestamp} - EXIT CHECKPOINT')
# pprint(f'[commit_ok_transaction] - {timestamp} - EXIT CHECKPOINT')
if has_account_modified:
print_all_available_accounts_balance()
except Exception as e:
# pass
rprint(f"[ERROR] commit_ok_transaction error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
finally:
if self_node.lock.locked():
......@@ -71,7 +77,7 @@ def commit_ok_transaction(timestamp: str, self_node: SelfNode):
# 收到commit 的消息,要去check
def commit_check_transaction(timestamp: str, self_node: SelfNode) -> bool:
pprint(f'[commit_check_transaction] - {timestamp} - ENTER CHECKPOINT')
# pprint(f'[commit_check_transaction] - {timestamp} - ENTER CHECKPOINT')
if not self_node.lock.locked():
self_node.lock.acquire()
try:
......@@ -91,7 +97,7 @@ def commit_check_transaction(timestamp: str, self_node: SelfNode) -> bool:
if timestamp in TW.keys():
flag = True
while flag:
pprint('[commit_check_transaction] - sleeping check 1...')
# pprint('[commit_check_transaction] - sleeping check 1...')
min_write_timestamp = timestamp
for temp in TW.keys():
if temp < min_write_timestamp:
......@@ -107,12 +113,12 @@ def commit_check_transaction(timestamp: str, self_node: SelfNode) -> bool:
flag = True
while flag:
# __import__('pudb').set_trace()
pprint('[commit_check_transaction] - sleeping check 2...')
# pprint('[commit_check_transaction] - sleeping check 2...')
# 这个时候还是有锁的,因为是从flag == false过来的
# min_read_timestamp = min(RTS)
# min_read_timestamp = min(list(filter(lambda x: x != 0, RTS))) # DELETE 0 IN THIS CASE
# __import__('pudb').set_trace()
pprint(f'[commit_check_transaction] - accounts_map[_key].RTS: {accounts_map[_key].RTS}')
# pprint(f'[commit_check_transaction] - accounts_map[_key].RTS: {accounts_map[_key].RTS}')
min_read_timestamp = min(list(filter(lambda x: x != 0, accounts_map[_key].RTS)))
if min_read_timestamp == timestamp:
flag = False
......@@ -136,13 +142,14 @@ def commit_check_transaction(timestamp: str, self_node: SelfNode) -> bool:
continue
# self_node.lock.release()
pprint(f'[commit_check_transaction] - {timestamp} - EXIT CHECKPOINT')
# pprint(f'[commit_check_transaction] - {timestamp} - EXIT CHECKPOINT')
# 如果transaction和这个server中所有的account都没有关系的话, 那么还是返回true
# 如果说有关系, 那违反consistency, 那就不行
# 如果不违反, 只是前面的transaction没有commit, 那就是等着
return commit_ok
except Exception as e:
rprint(f"[ERROR] commit_check_transaction error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
pass
# rprint(f"[ERROR] commit_check_transaction error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
finally:
if self_node.lock.locked():
self_node.lock.release()
......@@ -155,7 +162,7 @@ def tentative_write(self_node: SelfNode, timestamp: str, rawOperation: str):
self_node.lock.acquire()
try:
pprint(f'[tentative_write] - {timestamp} - {rawOperation} - ENTER CHECKPOINT')
# pprint(f'[tentative_write] - {timestamp} - {rawOperation} - ENTER CHECKPOINT')
# __import__('pudb').set_trace()
# raw operiation # deposit 和withdraw还不一样
account_name = rawOperation.split()[1] # DEPOSIT A.foo 50
......@@ -173,25 +180,25 @@ def tentative_write(self_node: SelfNode, timestamp: str, rawOperation: str):
# 更新TW
cur_account.TW[timestamp].operation_value += operation_value
# self_node.lock.release()
pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 1')
# pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 1')
return True
else:
new_tentative_write = WriteEntry(timestamp, operation_value)
cur_account.TW[timestamp] = new_tentative_write
# !!!用TW的时候需要对TW 进行排序
# self_node.lock.release()
pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 2')
# pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 2')
return True
else:
self_node.lock.release()
pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 3')
# pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 3')
return None # 需要abort
else: # 如果deposit 一个没有存在的账户,是可以的
new_account = Account(account_name, timestamp)
new_account.TW[timestamp] = WriteEntry(timestamp, operation_value)
accounts_map[account_name] = new_account # todo: to be checked with Yan
# self_node.lock.release()
pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4')
# pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4')
return True #
else: # == "WITHDRAW" 是read 和write的结合
......@@ -200,20 +207,21 @@ def tentative_write(self_node: SelfNode, timestamp: str, rawOperation: str):
read_result = read(self_node, timestamp, readOperation)
if read_result is None:
pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.5')
# pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.5')
return None
elif read_result == 'NOT FOUND, ABORTED':
# 需要abort了 并且是 NOT FOUND, ABORTED
# self_node.lock.release()
pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 5')
# pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 5')
return 'NOT FOUND, ABORTED'
else:
# 这里默认传入的operation_value 是有正负之分的
readOperation = "DEPOSIT" + " " + account_name + " " + str(operation_value * -1)
pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 6')
# pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 6')
return tentative_write(self_node, timestamp, readOperation) # todo: one more params?
except Exception as e:
rprint(f"[ERROR] tentative_write error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
pass
# rprint(f"[ERROR] tentative_write error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
finally:
if self_node.lock.locked():
self_node.lock.release()
......@@ -252,7 +260,7 @@ def tentative_write(self_node: SelfNode, timestamp: str, rawOperation: str):
# BALANCE COME HERE
def read(self_node: SelfNode, timestamp: str, rawOperation: str):
pprint(f'[read] - {timestamp} - {rawOperation} - ENTER CHECKPOINT')
# pprint(f'[read] - {timestamp} - {rawOperation} - ENTER CHECKPOINT')
if not self_node.lock.locked():
self_node.lock.acquire()
......@@ -263,13 +271,13 @@ def read(self_node: SelfNode, timestamp: str, rawOperation: str):
# self_node.lock.acquire()
if account_name not in accounts_map.keys():
# self_node.lock.release()
pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 1')
# pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 1')
return 'NOT FOUND, ABORTED' # 表示没有读到 需要abort
while flag:
# self_node.lock.acquire()
if account_name not in accounts_map.keys():
# self_node.lock.release()
pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 2')
# pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 2')
return 'NOT FOUND, ABORTED' # 表示没有读到 需要abort
cur_account: Account = accounts_map[account_name]
......@@ -315,10 +323,10 @@ def read(self_node: SelfNode, timestamp: str, rawOperation: str):
if timestamp in cur_account.TW.keys():
new_temp_balance_this_timestamp = cur_balance + cur_account.TW[timestamp].operation_value
# self_node.lock.release()
pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.4.1')
# pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.4.1')
return new_temp_balance_this_timestamp
else:
pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.4.2')
# pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.4.2')
return cur_balance
cur_account.RTS.append(timestamp)
......@@ -326,10 +334,10 @@ def read(self_node: SelfNode, timestamp: str, rawOperation: str):
if timestamp in cur_account.TW.keys():
new_temp_balance_this_timestamp = cur_balance + cur_account.TW[timestamp].operation_value
# self_node.lock.release()
pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.5.1')
# pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.5.1')
return new_temp_balance_this_timestamp
else:
pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.5.2')
# pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.5.2')
return cur_balance
else:
if Ds_written_by == timestamp:
......@@ -338,24 +346,25 @@ def read(self_node: SelfNode, timestamp: str, rawOperation: str):
new_temp_balance_this_timestamp = cur_balance + cur_account.TW[timestamp].operation_value
flag = False
# self_node.lock.release()
pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 5.1')
# pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 5.1')
return new_temp_balance_this_timestamp
# 传回 应该有的值, 传回coordinator的在其他函数写
else:
pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 5.2')
# pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 5.2')
return cur_balance
else:
pprint(f'[read] - {timestamp} - {rawOperation} - SLEEPING...')
# pprint(f'[read] - {timestamp} - {rawOperation} - SLEEPING...')
self_node.lock.release()
time.sleep(0.01)
self_node.lock.acquire()
else:
# abort(没写), 传输给coordinator 没写
# self_node.lock.release()
pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 6')
# pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 6')
return None
except Exception as e:
rprint(f"[ERROR] read error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
pass
# rprint(f"[ERROR] read error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
finally:
if self_node.lock.locked():
self_node.lock.release()
......@@ -363,7 +372,7 @@ def read(self_node: SelfNode, timestamp: str, rawOperation: str):
# abort 的本身
def abort(self_node: SelfNode, timestamp: str):
pprint(f'[abort] - {timestamp} - ENTER CHECKPOINT')
# pprint(f'[abort] - {timestamp} - ENTER CHECKPOINT')
# #对当前server 所有的account 的TW 和RTS 中进行查找看看这个timestamp 的东西
# self_node.lock.acquire()
......@@ -402,9 +411,10 @@ def abort(self_node: SelfNode, timestamp: str):
if timestamp in list(cur_account.TW.keys()):
del cur_account.TW[timestamp]
# self_node.lock.release()
pprint(f'[abort] - {timestamp} - EXIT CHECKPOINT')
# pprint(f'[abort] - {timestamp} - EXIT CHECKPOINT')
except Exception as e:
rprint(f"[ERROR] abort error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
pass
# rprint(f"[ERROR] abort error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
finally:
self_node.lock.release()
......@@ -462,22 +472,22 @@ def connect_with_all_servers(servers: dict, current_server_id: str, return_value
host_ip_address = SERVER_ID_TO_IP_MAPPING[server[SERVER_IDENTIFIER]]
# server[SERVER_PORT] = SERVERS[CURRENT_SERVER_ID][SERVER_PORT]+1 # todo: to be deleted
address = (host_ip_address, server[ACCEPTING_PEER_PORT])
pprint(f"[connect_with_all_servers] - {CURRENT_SERVER_ID} is trying to "
f"connect {(host_ip_address, server[ACCEPTING_PEER_PORT])}")
# pprint(f"[connect_with_all_servers] - {CURRENT_SERVER_ID} is trying to "
# f"connect {(host_ip_address, server[ACCEPTING_PEER_PORT])}")
s.connect(address)
connect_info[server_id] = s
gprint(f"[Connect] - "
f"Connecting to {server[SERVER_IDENTIFIER]}: {host_ip_address}:"
f"{server[ACCEPTING_PEER_PORT]} success!")
# gprint(f"[Connect] - "
# f"Connecting to {server[SERVER_IDENTIFIER]}: {host_ip_address}:"
# f"{server[ACCEPTING_PEER_PORT]} success!")
except Exception:
rprint(f"[Connect] - Connection to "
f"{server[SERVER_IDENTIFIER]} failed. "
f"Reconnecting in {RECONNECT_TIME}s..")
# rprint(f"[Connect] - Connection to "
# f"{server[SERVER_IDENTIFIER]} failed. "
# f"Reconnecting in {RECONNECT_TIME}s..")
remaining_nodes[server_id] = server
servers = remaining_nodes
time.sleep(RECONNECT_TIME)
return_values['connect_info'] = connect_info
gprint("[Connect] - Connect to all servers success!")
# gprint("[Connect] - Connect to all servers success!")
# def bind_with_all_servers(servers: dict, current_server_id: str, return_values: dict):
......@@ -515,7 +525,8 @@ def extract_target_server(operation):
try:
return operation.split(' ')[1].split('.')[0]
except Exception as e:
rprint(f"[ERROR] extract_target_server error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
pass
# rprint(f"[ERROR] extract_target_server error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
def send_msg_to_socket(msg, socket):
......@@ -523,7 +534,7 @@ def send_msg_to_socket(msg, socket):
msg_length = len(msg)
# send_length = str(msg_length).encode(ENCODING_FORMAT)
msg += b' ' * (MSG_LENGTH - msg_length)
pprint(f'[send_msg_to_socket] - msg.length : [{len(msg)}], msg: [{msg.strip()}]')
# pprint(f'[send_msg_to_socket] - msg.length : [{len(msg)}], msg: [{msg.strip()}]')
socket.sendall(msg)
......@@ -540,20 +551,20 @@ def operation_handler(operation, timestamp):
def broadcast_msg(operation, connect_info):
res = []
for node_id, each_socket in connect_info.items():
gprint(f'[{CURRENT_SERVER_ID}] - Multicasting [{operation}] to [{node_id}] NOW!')
# gprint(f'[{CURRENT_SERVER_ID}] - Multicasting [{operation}] to [{node_id}] NOW!')
send_msg_to_socket(operation, each_socket)
gprint(f'[{CURRENT_SERVER_ID}] - Multicasting [{operation}] to [{node_id}] DONE!')
# gprint(f'[{CURRENT_SERVER_ID}] - Multicasting [{operation}] to [{node_id}] DONE!')
if 'PREPARE COMMIT' in operation:
for node_id, each_socket in connect_info.items():
# gprint(f'[{CURRENT_SERVER_ID}] - Multicasting [{operation}] to [{node_id}] NOW!')
# send_msg_to_socket(operation, each_socket)
prepare_res = each_socket.recv(MSG_LENGTH).decode(ENCODING_FORMAT).strip()
gprint(f'[{CURRENT_SERVER_ID} PREPARE COMMIT] - RECEIVING from [{node_id}]: {prepare_res}')
# gprint(f'[{CURRENT_SERVER_ID} PREPARE COMMIT] - RECEIVING from [{node_id}]: {prepare_res}')
res.append(eval(prepare_res))
# gprint(f'[{CURRENT_SERVER_ID}] - Multicasting [{operation}] to [{node_id}] DONE!')
pprint(f'[broadcast_msg] - res: {res}')
# pprint(f'[broadcast_msg] - res: {res}')
return res
......@@ -566,23 +577,23 @@ def format_operation_and_timestamp_json(operation, timestamp):
def client_handler(conn, SELF_NODE):
pprint('[client_handler] CHECKPOINT -2')
# pprint('[client_handler] CHECKPOINT -2')
return_values = establish_bidirectional_connection(SERVERS, CURRENT_SERVER_ID)
pprint('[client_handler] CHECKPOINT -1')
# pprint('[client_handler] CHECKPOINT -1')
connect_info = return_values.get('connect_info', {})
# bind_info = return_values.get('bind_info', {})
timestamp = time.time()
pprint('[client_handler] CHECKPOINT 0')
pprint(f'[client_handler] - current customer timestamp {timestamp}')
# pprint('[client_handler] CHECKPOINT 0')
# pprint(f'[client_handler] - current customer timestamp {timestamp}')
try:
while True:
pprint('---------------------------------------------------------------')
pprint('[client_handler] CHECKPOINT 1')
# pprint('---------------------------------------------------------------')
# pprint('[client_handler] CHECKPOINT 1')
operation = conn.recv(MSG_LENGTH).decode(ENCODING_FORMAT).strip()
pprint('[client_handler] CHECKPOINT 2')
# pprint('[client_handler] CHECKPOINT 2')
operation_json = format_operation_and_timestamp_json(operation, timestamp)
gprint(f'[client_handler] - operation [{operation}] received success!')
# gprint(f'[client_handler] - operation [{operation}] received success!')
if not operation:
break
......@@ -593,14 +604,14 @@ def client_handler(conn, SELF_NODE):
target_server_connect_socket = connect_info.get(target_server_id)
# target_server_bind_socket = bind_info.get(target_server_id)
is_self = target_server_id == CURRENT_SERVER_ID
gprint(f'[client_handler] - operation [{operation}] SENDING TO {target_server_id}')
# gprint(f'[client_handler] - operation [{operation}] SENDING TO {target_server_id}')
ret_value = None
if operation.startswith('DEPOSIT') or operation.startswith('WITHDRAW'):
if is_self:
ret_value = tentative_write(self_node=SELF_NODE, timestamp=timestamp, rawOperation=operation)
gprint(
f'[server_handler] accounts_map: '
f'\n{json.dumps(accounts_map, default=lambda x: x.__dict__)}')
# gprint(
# f'[server_handler] accounts_map: '
# f'\n{json.dumps(accounts_map, default=lambda x: x.__dict__)}')
else:
send_msg_to_socket(operation_json, target_server_connect_socket)
# ret_value = target_server_bind_socket.recv(MSG_LENGTH).decode(ENCODING_FORMAT)
......@@ -608,9 +619,9 @@ def client_handler(conn, SELF_NODE):
elif operation.startswith('BALANCE'):
if is_self:
ret_value = read(self_node=SELF_NODE, timestamp=timestamp, rawOperation=operation)
gprint(
f'[server_handler] accounts_map: '
f'\n{json.dumps(accounts_map, default=lambda x: x.__dict__)}')
# gprint(
# f'[server_handler] accounts_map: '
# f'\n{json.dumps(accounts_map, default=lambda x: x.__dict__)}')
else:
send_msg_to_socket(operation_json, target_server_connect_socket)
# ret_value = target_server_bind_socket.recv(MSG_LENGTH).decode(ENCODING_FORMAT)
......@@ -630,16 +641,16 @@ def client_handler(conn, SELF_NODE):
broadcast_msg(operation_json, connect_info)
commit_ok_transaction(timestamp, SELF_NODE)
ret_value = 'COMMIT OK' # todo
gprint(f'[server_handler] accounts_map: \n'
f'{json.dumps(accounts_map, default=lambda x: x.__dict__)}')
# gprint(f'[server_handler] accounts_map: \n'
# f'{json.dumps(accounts_map, default=lambda x: x.__dict__)}')
elif operation.startswith('ABORT'):
# __import__('pudb').set_trace()
# broadcast_msg(operation_json, connect_info)
# abort(SELF_NODE, timestamp)
ret_value = 'ABORTED' # todo
gprint(f'[server_handler] accounts_map: '
f'\n{json.dumps(accounts_map, default=lambda x: x.__dict__)}')
# gprint(f'[server_handler] accounts_map: '
# f'\n{json.dumps(accounts_map, default=lambda x: x.__dict__)}')
ret_value = reformat_return_value(ret_value, operation)
if 'ABORT' in ret_value:
......@@ -648,9 +659,10 @@ def client_handler(conn, SELF_NODE):
send_msg_to_socket(ret_value, conn)
except Exception as e:
rprint(f"[ERROR] client_handler error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
pass
# rprint(f"[ERROR] client_handler error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
finally:
pprint("[client_handler] CONNECTION CLOSED...")
# pprint("[client_handler] CONNECTION CLOSED...")
conn.close()
# client_recv_socket.close()
# client_send_socket.close()
......@@ -676,10 +688,18 @@ def reformat_return_value(ret_value, operation):
raise TypeError(f'ret_value should be None or a bool, not {ret_value.__class__.__name__}')
def print_all_available_accounts_balance():
balances_str = ', '.join('{}:{}'.format(k, v.balance) if v.balance != 0
else '' for k, v in sorted(accounts_map.items()))
if balances_str:
print('BALANCES', end=' ')
print(balances_str)
def server_handler(conn, SELF_NODE):
try:
while True:
pprint('---------------------------------------------------------------')
# pprint('---------------------------------------------------------------')
operation_dict = json.loads(conn.recv(MSG_LENGTH).decode(ENCODING_FORMAT).strip())
operation = operation_dict.get('operation', '')
timestamp = operation_dict.get('timestamp', '')
......@@ -692,17 +712,19 @@ def server_handler(conn, SELF_NODE):
ret_value = commit_check_transaction(timestamp, SELF_NODE)
elif operation.startswith('COMMIT'):
ret_value = commit_ok_transaction(timestamp, SELF_NODE)
# print_all_available_accounts_balance()
elif operation.startswith('ABORT'):
abort(SELF_NODE, timestamp)
gprint(f'[server_handler] accounts_map: \n{json.dumps(accounts_map, default=lambda x: x.__dict__)}')
# gprint(f'[server_handler] accounts_map: \n{json.dumps(accounts_map, default=lambda x: x.__dict__)}')
break
ret_value = reformat_return_value(ret_value, operation)
send_msg_to_socket(ret_value, conn)
gprint(f'[server_handler] accounts_map: \n{json.dumps(accounts_map, default=lambda x: x.__dict__)}')
# gprint(f'[server_handler] accounts_map: \n{json.dumps(accounts_map, default=lambda x: x.__dict__)}')
except Exception as e:
rprint(f"[ERROR] server_handler error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
pass
# rprint(f"[ERROR] server_handler error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
finally:
pprint('[server_handler] - ABORTED - connection with the current client CLOSED!')
# pprint('[server_handler] - ABORTED - connection with the current client CLOSED!')
conn.close()
# peer_server_send_socket.close()
......@@ -716,7 +738,7 @@ def accepting_clients(SELF_NODE):
# print(f"[LISTENING] Server is listening on {ADDR}") # DEBUG
while True:
conn, addr = server.accept()
gprint(f'[accepting_clients] - client [{addr}] connect to coordinator [{CURRENT_SERVER_ID}] success!')
# gprint(f'[accepting_clients] - client [{addr}] connect to coordinator [{CURRENT_SERVER_ID}] success!')
# client_send_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# client_send_socket.connect((addr[0], 5678))
# gprint(f'[accepting_clients] - coordinator [{CURRENT_SERVER_ID}] connect to client [({addr[0]}, {5678})] success!')
......@@ -724,7 +746,8 @@ def accepting_clients(SELF_NODE):
thread = threading.Thread(target=client_handler, args=[conn, SELF_NODE])
thread.start()
except Exception as e:
rprint(f"[ERROR] accepting_clients error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
pass
# rprint(f"[ERROR] accepting_clients error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
finally:
server.close()
......@@ -733,19 +756,20 @@ def accepting_peer_servers(SELF_NODE):
try:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((CURRENT_SERVER_IP, SERVERS[CURRENT_SERVER_ID][ACCEPTING_PEER_PORT]))
pprint(f'[accepting_peer_servers] - {CURRENT_SERVER_ID} current '
f'bind at {(CURRENT_SERVER_IP, SERVERS[CURRENT_SERVER_ID][ACCEPTING_PEER_PORT])}')
# pprint(f'[accepting_peer_servers] - {CURRENT_SERVER_ID} current '
# f'bind at {(CURRENT_SERVER_IP, SERVERS[CURRENT_SERVER_ID][ACCEPTING_PEER_PORT])}')
server.listen()
# print(f"[LISTENING] Server is listening on {ADDR}") # DEBUG
while True:
conn, addr = server.accept()
gprint(f'[accepting_peer_servers] - peer server [{addr}] connect to server [{CURRENT_SERVER_ID}] success!')
# gprint(f'[accepting_peer_servers] - peer server [{addr}] connect to server [{CURRENT_SERVER_ID}] success!')
# client_send_socket = server.connect(addr)
# gprint(f'[accepting_peer_servers] - server [{CURRENT_SERVER_ID}] connect to peer server [{addr}] success!')
thread = threading.Thread(target=server_handler, args=[conn, SELF_NODE])
thread.start()
except Exception as e:
rprint(f"[ERROR] accepting_peer_servers error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
pass
# rprint(f"[ERROR] accepting_peer_servers error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
finally:
server.close()
......
import random
from constants import SERVER_IDENTIFIER, SERVER_ADDRESS, SERVER_PORT, ENCODING_FORMAT, MSG_LENGTH
from utils import gprint, pprint
# from utils import gprint, pprint
import os
import sys
import socket
......@@ -37,9 +37,9 @@ def connect(coordinator_info: tuple):
coordinator_port_number = coordinator_info[1][SERVER_PORT]
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
address = (coordinator_ip_address, coordinator_port_number)
pprint(f'[connect] - client connecting to address {address}')
# pprint(f'[connect] - client connecting to address {address}')
client.connect(address)
pprint(f'[connect] - client connecting to address {address} success!')
# pprint(f'[connect] - client connecting to address {address} success!')
return client
......@@ -48,17 +48,17 @@ def bind():
server.bind(ADDR)
server.listen()
conn, addr = server.accept()
gprint(f"[Bind] - Bind to {addr} success!")
# gprint(f"[Bind] - Bind to {addr} success!")
return conn
def send_operation(operation_str, socket):
msg = operation_str.encode(ENCODING_FORMAT)
msg = operation_str.encode(ENCODING_FORMAT) # todo: upper case
msg_length = len(msg)
# send_length = str(msg_length).encode(ENCODING_FORMAT)
msg += b' ' * (MSG_LENGTH - msg_length)
# pprint(f'[send_operation] - {locals()}')
pprint(f'[send_operation] - msg.length : {len(msg)}')
# pprint(f'[send_operation] - msg.length : {len(msg)}')
socket.sendall(msg)
......@@ -77,10 +77,10 @@ if __name__ == '__main__':
coordinator_info = pick_coordinator(servers)
client_to_coordinator_socket = connect(coordinator_info)
pprint('[client_to_coordinator_socket] CONNECT SUCCESS!')
# pprint('[client_to_coordinator_socket] CONNECT SUCCESS!')
# coordinator_to_client_socket = bind()
# pprint('[bind] BIND SUCCESS!')
print(coordinator_info)
# print(coordinator_info)
try:
while True:
operation = input()
......@@ -95,9 +95,9 @@ if __name__ == '__main__':
break
# todo: add end condition
except EOFError:
gprint("[Client] - All commands in .txt file executed. Client Exits..")
# gprint("[Client] - All commands in .txt file executed. Client Exits..")
os._exit(0)
finally:
client_to_coordinator_socket.close()
gprint("[Client] - client_to_coordinator_socket CLOSED..")
# gprint("[Client] - client_to_coordinator_socket CLOSED..")
# coordinator_to_client_socket.close()
A 127.0.0.1 10001
B 127.0.0.1 10002
C 127.0.0.1 10003
D 127.0.0.1 10004
E 127.0.0.1 10005
\ No newline at end of file
A sp22-cs425-g68-01.cs.illinois.edu 1234
B sp22-cs425-g68-02.cs.illinois.edu 1234
C sp22-cs425-g68-03.cs.illinois.edu 1234
D sp22-cs425-g68-04.cs.illinois.edu 1234
E sp22-cs425-g68-05.cs.illinois.edu 1234
......@@ -3,7 +3,8 @@ import json
import sys
import time
import traceback
from utils import gprint, rprint, pprint
from utils import rprint
# from utils import gprint, rprint, pprint
from constants import SERVER_IDENTIFIER, SERVER_ADDRESS, SERVER_PORT, RECONNECT_TIME, MSG_LENGTH, ENCODING_FORMAT, \
ACCEPTING_PEER_PORT
import socket
......@@ -43,8 +44,9 @@ def commit_ok_transaction(timestamp: str, self_node: SelfNode):
if not self_node.lock.locked():
self_node.lock.acquire()
try:
pprint(f'[commit_ok_transaction] - {timestamp} - ENTER CHECKPOINT')
# pprint(f'[commit_ok_transaction] - {timestamp} - ENTER CHECKPOINT')
# timestamp 是肯定可以commit
has_account_modified = False
for cur_account in accounts_map.values():
# 删除这个accountRTS 中所有的timestamp
cur_account.RTS = list(filter(lambda x: x != timestamp, cur_account.RTS))
......@@ -58,10 +60,14 @@ def commit_ok_transaction(timestamp: str, self_node: SelfNode):
# __import__('pudb').set_trace()
cur_account.balance += cur_account.TW[timestamp].operation_value
cur_account.TW.pop(timestamp)
break
has_account_modified = True
# break
# self_node.lock.release()
pprint(f'[commit_ok_transaction] - {timestamp} - EXIT CHECKPOINT')
# pprint(f'[commit_ok_transaction] - {timestamp} - EXIT CHECKPOINT')
if has_account_modified:
print_all_available_accounts_balance()
except Exception as e:
# pass
rprint(f"[ERROR] commit_ok_transaction error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
finally:
if self_node.lock.locked():
......@@ -71,7 +77,7 @@ def commit_ok_transaction(timestamp: str, self_node: SelfNode):
# 收到commit 的消息,要去check
def commit_check_transaction(timestamp: str, self_node: SelfNode) -> bool:
pprint(f'[commit_check_transaction] - {timestamp} - ENTER CHECKPOINT')
# pprint(f'[commit_check_transaction] - {timestamp} - ENTER CHECKPOINT')
if not self_node.lock.locked():
self_node.lock.acquire()
try:
......@@ -91,7 +97,7 @@ def commit_check_transaction(timestamp: str, self_node: SelfNode) -> bool:
if timestamp in TW.keys():
flag = True
while flag:
pprint('[commit_check_transaction] - sleeping check 1...')
# pprint('[commit_check_transaction] - sleeping check 1...')
min_write_timestamp = timestamp
for temp in TW.keys():
if temp < min_write_timestamp:
......@@ -107,12 +113,12 @@ def commit_check_transaction(timestamp: str, self_node: SelfNode) -> bool:
flag = True
while flag:
# __import__('pudb').set_trace()
pprint('[commit_check_transaction] - sleeping check 2...')
# pprint('[commit_check_transaction] - sleeping check 2...')
# 这个时候还是有锁的,因为是从flag == false过来的
# min_read_timestamp = min(RTS)
# min_read_timestamp = min(list(filter(lambda x: x != 0, RTS))) # DELETE 0 IN THIS CASE
# __import__('pudb').set_trace()
pprint(f'[commit_check_transaction] - accounts_map[_key].RTS: {accounts_map[_key].RTS}')
# pprint(f'[commit_check_transaction] - accounts_map[_key].RTS: {accounts_map[_key].RTS}')
min_read_timestamp = min(list(filter(lambda x: x != 0, accounts_map[_key].RTS)))
if min_read_timestamp == timestamp:
flag = False
......@@ -136,13 +142,14 @@ def commit_check_transaction(timestamp: str, self_node: SelfNode) -> bool:
continue
# self_node.lock.release()
pprint(f'[commit_check_transaction] - {timestamp} - EXIT CHECKPOINT')
# pprint(f'[commit_check_transaction] - {timestamp} - EXIT CHECKPOINT')
# 如果transaction和这个server中所有的account都没有关系的话, 那么还是返回true
# 如果说有关系, 那违反consistency, 那就不行
# 如果不违反, 只是前面的transaction没有commit, 那就是等着
return commit_ok
except Exception as e:
rprint(f"[ERROR] commit_check_transaction error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
pass
# rprint(f"[ERROR] commit_check_transaction error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
finally:
if self_node.lock.locked():
self_node.lock.release()
......@@ -155,7 +162,7 @@ def tentative_write(self_node: SelfNode, timestamp: str, rawOperation: str):
self_node.lock.acquire()
try:
pprint(f'[tentative_write] - {timestamp} - {rawOperation} - ENTER CHECKPOINT')
# pprint(f'[tentative_write] - {timestamp} - {rawOperation} - ENTER CHECKPOINT')
# __import__('pudb').set_trace()
# raw operiation # deposit 和withdraw还不一样
account_name = rawOperation.split()[1] # DEPOSIT A.foo 50
......@@ -173,25 +180,25 @@ def tentative_write(self_node: SelfNode, timestamp: str, rawOperation: str):
# 更新TW
cur_account.TW[timestamp].operation_value += operation_value
# self_node.lock.release()
pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 1')
# pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 1')
return True
else:
new_tentative_write = WriteEntry(timestamp, operation_value)
cur_account.TW[timestamp] = new_tentative_write
# !!!用TW的时候需要对TW 进行排序
# self_node.lock.release()
pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 2')
# pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 2')
return True
else:
self_node.lock.release()
pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 3')
# pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 3')
return None # 需要abort
else: # 如果deposit 一个没有存在的账户,是可以的
new_account = Account(account_name, timestamp)
new_account.TW[timestamp] = WriteEntry(timestamp, operation_value)
accounts_map[account_name] = new_account # todo: to be checked with Yan
# self_node.lock.release()
pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4')
# pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4')
return True #
else: # == "WITHDRAW" 是read 和write的结合
......@@ -200,20 +207,21 @@ def tentative_write(self_node: SelfNode, timestamp: str, rawOperation: str):
read_result = read(self_node, timestamp, readOperation)
if read_result is None:
pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.5')
# pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.5')
return None
elif read_result == 'NOT FOUND, ABORTED':
# 需要abort了 并且是 NOT FOUND, ABORTED
# self_node.lock.release()
pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 5')
# pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 5')
return 'NOT FOUND, ABORTED'
else:
# 这里默认传入的operation_value 是有正负之分的
readOperation = "DEPOSIT" + " " + account_name + " " + str(operation_value * -1)
pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 6')
# pprint(f'[tentative_write] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 6')
return tentative_write(self_node, timestamp, readOperation) # todo: one more params?
except Exception as e:
rprint(f"[ERROR] tentative_write error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
pass
# rprint(f"[ERROR] tentative_write error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
finally:
if self_node.lock.locked():
self_node.lock.release()
......@@ -252,7 +260,7 @@ def tentative_write(self_node: SelfNode, timestamp: str, rawOperation: str):
# BALANCE COME HERE
def read(self_node: SelfNode, timestamp: str, rawOperation: str):
pprint(f'[read] - {timestamp} - {rawOperation} - ENTER CHECKPOINT')
# pprint(f'[read] - {timestamp} - {rawOperation} - ENTER CHECKPOINT')
if not self_node.lock.locked():
self_node.lock.acquire()
......@@ -263,13 +271,13 @@ def read(self_node: SelfNode, timestamp: str, rawOperation: str):
# self_node.lock.acquire()
if account_name not in accounts_map.keys():
# self_node.lock.release()
pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 1')
# pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 1')
return 'NOT FOUND, ABORTED' # 表示没有读到 需要abort
while flag:
# self_node.lock.acquire()
if account_name not in accounts_map.keys():
# self_node.lock.release()
pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 2')
# pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 2')
return 'NOT FOUND, ABORTED' # 表示没有读到 需要abort
cur_account: Account = accounts_map[account_name]
......@@ -315,10 +323,10 @@ def read(self_node: SelfNode, timestamp: str, rawOperation: str):
if timestamp in cur_account.TW.keys():
new_temp_balance_this_timestamp = cur_balance + cur_account.TW[timestamp].operation_value
# self_node.lock.release()
pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.4.1')
# pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.4.1')
return new_temp_balance_this_timestamp
else:
pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.4.2')
# pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.4.2')
return cur_balance
cur_account.RTS.append(timestamp)
......@@ -326,10 +334,10 @@ def read(self_node: SelfNode, timestamp: str, rawOperation: str):
if timestamp in cur_account.TW.keys():
new_temp_balance_this_timestamp = cur_balance + cur_account.TW[timestamp].operation_value
# self_node.lock.release()
pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.5.1')
# pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.5.1')
return new_temp_balance_this_timestamp
else:
pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.5.2')
# pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 4.5.2')
return cur_balance
else:
if Ds_written_by == timestamp:
......@@ -338,24 +346,25 @@ def read(self_node: SelfNode, timestamp: str, rawOperation: str):
new_temp_balance_this_timestamp = cur_balance + cur_account.TW[timestamp].operation_value
flag = False
# self_node.lock.release()
pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 5.1')
# pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 5.1')
return new_temp_balance_this_timestamp
# 传回 应该有的值, 传回coordinator的在其他函数写
else:
pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 5.2')
# pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 5.2')
return cur_balance
else:
pprint(f'[read] - {timestamp} - {rawOperation} - SLEEPING...')
# pprint(f'[read] - {timestamp} - {rawOperation} - SLEEPING...')
self_node.lock.release()
time.sleep(0.01)
self_node.lock.acquire()
else:
# abort(没写), 传输给coordinator 没写
# self_node.lock.release()
pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 6')
# pprint(f'[read] - {timestamp} - {rawOperation} - EXIT CHECKPOINT 6')
return None
except Exception as e:
rprint(f"[ERROR] read error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
pass
# rprint(f"[ERROR] read error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
finally:
if self_node.lock.locked():
self_node.lock.release()
......@@ -363,7 +372,7 @@ def read(self_node: SelfNode, timestamp: str, rawOperation: str):
# abort 的本身
def abort(self_node: SelfNode, timestamp: str):
pprint(f'[abort] - {timestamp} - ENTER CHECKPOINT')
# pprint(f'[abort] - {timestamp} - ENTER CHECKPOINT')
# #对当前server 所有的account 的TW 和RTS 中进行查找看看这个timestamp 的东西
# self_node.lock.acquire()
......@@ -402,9 +411,10 @@ def abort(self_node: SelfNode, timestamp: str):
if timestamp in list(cur_account.TW.keys()):
del cur_account.TW[timestamp]
# self_node.lock.release()
pprint(f'[abort] - {timestamp} - EXIT CHECKPOINT')
# pprint(f'[abort] - {timestamp} - EXIT CHECKPOINT')
except Exception as e:
rprint(f"[ERROR] abort error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
pass
# rprint(f"[ERROR] abort error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
finally:
self_node.lock.release()
......@@ -462,22 +472,22 @@ def connect_with_all_servers(servers: dict, current_server_id: str, return_value
host_ip_address = SERVER_ID_TO_IP_MAPPING[server[SERVER_IDENTIFIER]]
# server[SERVER_PORT] = SERVERS[CURRENT_SERVER_ID][SERVER_PORT]+1 # todo: to be deleted
address = (host_ip_address, server[ACCEPTING_PEER_PORT])
pprint(f"[connect_with_all_servers] - {CURRENT_SERVER_ID} is trying to "
f"connect {(host_ip_address, server[ACCEPTING_PEER_PORT])}")
# pprint(f"[connect_with_all_servers] - {CURRENT_SERVER_ID} is trying to "
# f"connect {(host_ip_address, server[ACCEPTING_PEER_PORT])}")
s.connect(address)
connect_info[server_id] = s
gprint(f"[Connect] - "
f"Connecting to {server[SERVER_IDENTIFIER]}: {host_ip_address}:"
f"{server[ACCEPTING_PEER_PORT]} success!")
# gprint(f"[Connect] - "
# f"Connecting to {server[SERVER_IDENTIFIER]}: {host_ip_address}:"
# f"{server[ACCEPTING_PEER_PORT]} success!")
except Exception:
rprint(f"[Connect] - Connection to "
f"{server[SERVER_IDENTIFIER]} failed. "
f"Reconnecting in {RECONNECT_TIME}s..")
# rprint(f"[Connect] - Connection to "
# f"{server[SERVER_IDENTIFIER]} failed. "
# f"Reconnecting in {RECONNECT_TIME}s..")
remaining_nodes[server_id] = server
servers = remaining_nodes
time.sleep(RECONNECT_TIME)
return_values['connect_info'] = connect_info
gprint("[Connect] - Connect to all servers success!")
# gprint("[Connect] - Connect to all servers success!")
# def bind_with_all_servers(servers: dict, current_server_id: str, return_values: dict):
......@@ -515,7 +525,8 @@ def extract_target_server(operation):
try:
return operation.split(' ')[1].split('.')[0]
except Exception as e:
rprint(f"[ERROR] extract_target_server error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
pass
# rprint(f"[ERROR] extract_target_server error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
def send_msg_to_socket(msg, socket):
......@@ -523,7 +534,7 @@ def send_msg_to_socket(msg, socket):
msg_length = len(msg)
# send_length = str(msg_length).encode(ENCODING_FORMAT)
msg += b' ' * (MSG_LENGTH - msg_length)
pprint(f'[send_msg_to_socket] - msg.length : [{len(msg)}], msg: [{msg.strip()}]')
# pprint(f'[send_msg_to_socket] - msg.length : [{len(msg)}], msg: [{msg.strip()}]')
socket.sendall(msg)
......@@ -540,20 +551,20 @@ def operation_handler(operation, timestamp):
def broadcast_msg(operation, connect_info):
res = []
for node_id, each_socket in connect_info.items():
gprint(f'[{CURRENT_SERVER_ID}] - Multicasting [{operation}] to [{node_id}] NOW!')
# gprint(f'[{CURRENT_SERVER_ID}] - Multicasting [{operation}] to [{node_id}] NOW!')
send_msg_to_socket(operation, each_socket)
gprint(f'[{CURRENT_SERVER_ID}] - Multicasting [{operation}] to [{node_id}] DONE!')
# gprint(f'[{CURRENT_SERVER_ID}] - Multicasting [{operation}] to [{node_id}] DONE!')
if 'PREPARE COMMIT' in operation:
for node_id, each_socket in connect_info.items():
# gprint(f'[{CURRENT_SERVER_ID}] - Multicasting [{operation}] to [{node_id}] NOW!')
# send_msg_to_socket(operation, each_socket)
prepare_res = each_socket.recv(MSG_LENGTH).decode(ENCODING_FORMAT).strip()
gprint(f'[{CURRENT_SERVER_ID} PREPARE COMMIT] - RECEIVING from [{node_id}]: {prepare_res}')
# gprint(f'[{CURRENT_SERVER_ID} PREPARE COMMIT] - RECEIVING from [{node_id}]: {prepare_res}')
res.append(eval(prepare_res))
# gprint(f'[{CURRENT_SERVER_ID}] - Multicasting [{operation}] to [{node_id}] DONE!')
pprint(f'[broadcast_msg] - res: {res}')
# pprint(f'[broadcast_msg] - res: {res}')
return res
......@@ -566,23 +577,23 @@ def format_operation_and_timestamp_json(operation, timestamp):
def client_handler(conn, SELF_NODE):
pprint('[client_handler] CHECKPOINT -2')
# pprint('[client_handler] CHECKPOINT -2')
return_values = establish_bidirectional_connection(SERVERS, CURRENT_SERVER_ID)
pprint('[client_handler] CHECKPOINT -1')
# pprint('[client_handler] CHECKPOINT -1')
connect_info = return_values.get('connect_info', {})
# bind_info = return_values.get('bind_info', {})
timestamp = time.time()
pprint('[client_handler] CHECKPOINT 0')
pprint(f'[client_handler] - current customer timestamp {timestamp}')
# pprint('[client_handler] CHECKPOINT 0')
# pprint(f'[client_handler] - current customer timestamp {timestamp}')
try:
while True:
pprint('---------------------------------------------------------------')
pprint('[client_handler] CHECKPOINT 1')
# pprint('---------------------------------------------------------------')
# pprint('[client_handler] CHECKPOINT 1')
operation = conn.recv(MSG_LENGTH).decode(ENCODING_FORMAT).strip()
pprint('[client_handler] CHECKPOINT 2')
# pprint('[client_handler] CHECKPOINT 2')
operation_json = format_operation_and_timestamp_json(operation, timestamp)
gprint(f'[client_handler] - operation [{operation}] received success!')
# gprint(f'[client_handler] - operation [{operation}] received success!')
if not operation:
break
......@@ -593,14 +604,14 @@ def client_handler(conn, SELF_NODE):
target_server_connect_socket = connect_info.get(target_server_id)
# target_server_bind_socket = bind_info.get(target_server_id)
is_self = target_server_id == CURRENT_SERVER_ID
gprint(f'[client_handler] - operation [{operation}] SENDING TO {target_server_id}')
# gprint(f'[client_handler] - operation [{operation}] SENDING TO {target_server_id}')
ret_value = None
if operation.startswith('DEPOSIT') or operation.startswith('WITHDRAW'):
if is_self:
ret_value = tentative_write(self_node=SELF_NODE, timestamp=timestamp, rawOperation=operation)
gprint(
f'[server_handler] accounts_map: '
f'\n{json.dumps(accounts_map, default=lambda x: x.__dict__)}')
# gprint(
# f'[server_handler] accounts_map: '
# f'\n{json.dumps(accounts_map, default=lambda x: x.__dict__)}')
else:
send_msg_to_socket(operation_json, target_server_connect_socket)
# ret_value = target_server_bind_socket.recv(MSG_LENGTH).decode(ENCODING_FORMAT)
......@@ -608,9 +619,9 @@ def client_handler(conn, SELF_NODE):
elif operation.startswith('BALANCE'):
if is_self:
ret_value = read(self_node=SELF_NODE, timestamp=timestamp, rawOperation=operation)
gprint(
f'[server_handler] accounts_map: '
f'\n{json.dumps(accounts_map, default=lambda x: x.__dict__)}')
# gprint(
# f'[server_handler] accounts_map: '
# f'\n{json.dumps(accounts_map, default=lambda x: x.__dict__)}')
else:
send_msg_to_socket(operation_json, target_server_connect_socket)
# ret_value = target_server_bind_socket.recv(MSG_LENGTH).decode(ENCODING_FORMAT)
......@@ -630,16 +641,16 @@ def client_handler(conn, SELF_NODE):
broadcast_msg(operation_json, connect_info)
commit_ok_transaction(timestamp, SELF_NODE)
ret_value = 'COMMIT OK' # todo
gprint(f'[server_handler] accounts_map: \n'
f'{json.dumps(accounts_map, default=lambda x: x.__dict__)}')
# gprint(f'[server_handler] accounts_map: \n'
# f'{json.dumps(accounts_map, default=lambda x: x.__dict__)}')
elif operation.startswith('ABORT'):
# __import__('pudb').set_trace()
# broadcast_msg(operation_json, connect_info)
# abort(SELF_NODE, timestamp)
ret_value = 'ABORTED' # todo
gprint(f'[server_handler] accounts_map: '
f'\n{json.dumps(accounts_map, default=lambda x: x.__dict__)}')
# gprint(f'[server_handler] accounts_map: '
# f'\n{json.dumps(accounts_map, default=lambda x: x.__dict__)}')
ret_value = reformat_return_value(ret_value, operation)
if 'ABORT' in ret_value:
......@@ -648,9 +659,10 @@ def client_handler(conn, SELF_NODE):
send_msg_to_socket(ret_value, conn)
except Exception as e:
rprint(f"[ERROR] client_handler error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
pass
# rprint(f"[ERROR] client_handler error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
finally:
pprint("[client_handler] CONNECTION CLOSED...")
# pprint("[client_handler] CONNECTION CLOSED...")
conn.close()
# client_recv_socket.close()
# client_send_socket.close()
......@@ -676,10 +688,18 @@ def reformat_return_value(ret_value, operation):
raise TypeError(f'ret_value should be None or a bool, not {ret_value.__class__.__name__}')
def print_all_available_accounts_balance():
balances_str = ', '.join('{}:{}'.format(k, v.balance) if v.balance != 0
else '' for k, v in sorted(accounts_map.items()))
if balances_str:
print('BALANCES', end=' ')
print(balances_str)
def server_handler(conn, SELF_NODE):
try:
while True:
pprint('---------------------------------------------------------------')
# pprint('---------------------------------------------------------------')
operation_dict = json.loads(conn.recv(MSG_LENGTH).decode(ENCODING_FORMAT).strip())
operation = operation_dict.get('operation', '')
timestamp = operation_dict.get('timestamp', '')
......@@ -692,17 +712,19 @@ def server_handler(conn, SELF_NODE):
ret_value = commit_check_transaction(timestamp, SELF_NODE)
elif operation.startswith('COMMIT'):
ret_value = commit_ok_transaction(timestamp, SELF_NODE)
# print_all_available_accounts_balance()
elif operation.startswith('ABORT'):
abort(SELF_NODE, timestamp)
gprint(f'[server_handler] accounts_map: \n{json.dumps(accounts_map, default=lambda x: x.__dict__)}')
# gprint(f'[server_handler] accounts_map: \n{json.dumps(accounts_map, default=lambda x: x.__dict__)}')
break
ret_value = reformat_return_value(ret_value, operation)
send_msg_to_socket(ret_value, conn)
gprint(f'[server_handler] accounts_map: \n{json.dumps(accounts_map, default=lambda x: x.__dict__)}')
# gprint(f'[server_handler] accounts_map: \n{json.dumps(accounts_map, default=lambda x: x.__dict__)}')
except Exception as e:
rprint(f"[ERROR] server_handler error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
pass
# rprint(f"[ERROR] server_handler error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
finally:
pprint('[server_handler] - ABORTED - connection with the current client CLOSED!')
# pprint('[server_handler] - ABORTED - connection with the current client CLOSED!')
conn.close()
# peer_server_send_socket.close()
......@@ -716,7 +738,7 @@ def accepting_clients(SELF_NODE):
# print(f"[LISTENING] Server is listening on {ADDR}") # DEBUG
while True:
conn, addr = server.accept()
gprint(f'[accepting_clients] - client [{addr}] connect to coordinator [{CURRENT_SERVER_ID}] success!')
# gprint(f'[accepting_clients] - client [{addr}] connect to coordinator [{CURRENT_SERVER_ID}] success!')
# client_send_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# client_send_socket.connect((addr[0], 5678))
# gprint(f'[accepting_clients] - coordinator [{CURRENT_SERVER_ID}] connect to client [({addr[0]}, {5678})] success!')
......@@ -724,7 +746,8 @@ def accepting_clients(SELF_NODE):
thread = threading.Thread(target=client_handler, args=[conn, SELF_NODE])
thread.start()
except Exception as e:
rprint(f"[ERROR] accepting_clients error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
pass
# rprint(f"[ERROR] accepting_clients error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
finally:
server.close()
......@@ -733,19 +756,20 @@ def accepting_peer_servers(SELF_NODE):
try:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((CURRENT_SERVER_IP, SERVERS[CURRENT_SERVER_ID][ACCEPTING_PEER_PORT]))
pprint(f'[accepting_peer_servers] - {CURRENT_SERVER_ID} current '
f'bind at {(CURRENT_SERVER_IP, SERVERS[CURRENT_SERVER_ID][ACCEPTING_PEER_PORT])}')
# pprint(f'[accepting_peer_servers] - {CURRENT_SERVER_ID} current '
# f'bind at {(CURRENT_SERVER_IP, SERVERS[CURRENT_SERVER_ID][ACCEPTING_PEER_PORT])}')
server.listen()
# print(f"[LISTENING] Server is listening on {ADDR}") # DEBUG
while True:
conn, addr = server.accept()
gprint(f'[accepting_peer_servers] - peer server [{addr}] connect to server [{CURRENT_SERVER_ID}] success!')
# gprint(f'[accepting_peer_servers] - peer server [{addr}] connect to server [{CURRENT_SERVER_ID}] success!')
# client_send_socket = server.connect(addr)
# gprint(f'[accepting_peer_servers] - server [{CURRENT_SERVER_ID}] connect to peer server [{addr}] success!')
thread = threading.Thread(target=server_handler, args=[conn, SELF_NODE])
thread.start()
except Exception as e:
rprint(f"[ERROR] accepting_peer_servers error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
pass
# rprint(f"[ERROR] accepting_peer_servers error:{e.__class__.__name__}.{e}.{traceback.format_exc()}")
finally:
server.close()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment