Skip to content
Snippets Groups Projects
Commit 0bde7a93 authored by Yan He's avatar Yan He
Browse files

initial logic without receive and multicast

parent 5393d79d
No related branches found
No related tags found
1 merge request!7Mp1 Done
...@@ -11,3 +11,8 @@ CGREEN = '\033[92m' ...@@ -11,3 +11,8 @@ CGREEN = '\033[92m'
CEND = '\033[0m' CEND = '\033[0m'
RECONNECT_TIME = 1 # 1s RECONNECT_TIME = 1 # 1s
# INITIAL_MSG_LENGTH = 64
# REQUEST_DISCONNECT_MSG = 'REQUEST_DISCONNECT!'
ENCODING_FORMAT = 'utf-8'
from transaction import Transaction from transaction import Transaction
import time
class Message: class Message:
def __init__(self, transaction: Transaction, priority: int, node_num: int, is_final_msg: bool): def __init__(self, transaction: Transaction, priority: int, node_num: int, is_final_msg: bool):
self.transaction = transaction self.transaction = transaction
self.priority = str.format(f"{priority}-{node_num}") # the format changed to priority.node_num
self.priority = str.format(f"{priority}.{node_num}")
self.is_final_msg = is_final_msg self.is_final_msg = is_final_msg
# self.send_time = -1 # will be set when send the message
self.node_num = node_num
# for the order of priority queue
def __lt__(self, other):
temp1 = float(self.priority)
temp2 = float(other.priority)
return temp1 < temp2
# import socket
# import message
#
# class SockError(RuntimeError):
# def __init__(self, arg):
# self.sock = arg
#
# class R_multicast:
# def __init__(self, group, my_id, message, application_deliver):
# self.group = group
# self.id = my_id
# self.deliver = application_deliver
# self.message = message
# def multicast_send(self, message):
# err_conn = []
# for member in self.group:
# try:
# # class unicast:
# # @classmethod
# # def send(cls, msg, socket_touse): # send the message to the specified member (use TCP
# # socket_touse.sendall(msg)
# # @classmethod
# # def register_deliver(cls, b_multicast_deliver):
# # cls.b_multicast_deliver = b_multicast_deliver
# # @classmethod
# # def deliver(cls, msg):
# # cls.b_multicast_deliver(msg)
# #
# #
# # class B_multicast:
# # def __init__(self, group, my_id, r_multicast_deliver):
# # self.group = group
# # self.id = my_id
# # self.deliver = r_multicast_deliver
# # unicast.register_deliver(self.b_multicast_deliver)
# # self.send_amount = 0
# #
# # def send(self, msg):
# # err_conn = []
# # for member in self.group:
# # try:
# # unicast.send(msg, member)
# # self.send_amount += 1
# # except socket.error:
# # err_conn.append(member)
# # return err_conn
# #
# # def b_multicast_deliver(self, msg):
# # self.deliver(msg)
# #
# # class R_multicast:
# # def __init__(self, group, my_id, app_deliver):
# # self.group = group
# # self.id = my_id
# # self.deliver = app_deliver # this is application's deliver
# # self.b_multicast = B_multicast(group, my_id, self.r_multicast_deliver)
# # self.messages = set()
# #
# # def send(self, msg):
# # return self.b_multicast.send(msg)
# #
# # def r_multicast_deliver(self, msg):
# # final_message = Message()
# # final_message = msg
import collections
import heapq
import sys import sys
import time import time
import threading import threading
from constants import NODE_ADDRESS, NODE_IDENTIFIER, NODE_PORT, CRED, CEND, RECONNECT_TIME, CGREEN from constants import NODE_ADDRESS, NODE_IDENTIFIER, NODE_PORT, CRED, CEND, RECONNECT_TIME, CGREEN, ENCODING_FORMAT
from utils import parse_config from utils import parse_config
import socket import socket
import copy import copy
from message import Message
from queue import PriorityQueue
import collections
from transaction import Transaction
# import traceback # import traceback
SERVER = socket.gethostbyname(socket.gethostname()) SERVER = socket.gethostbyname(socket.gethostname())
...@@ -12,6 +19,64 @@ NODE_ID_TO_IP_MAPPING = {} ...@@ -12,6 +19,64 @@ NODE_ID_TO_IP_MAPPING = {}
IP_TO_NODE_ID_MAPPING = {} IP_TO_NODE_ID_MAPPING = {}
class SelfNode:
def __init__(self, my_id):
self.balance = collections.defaultdict(int) # {name, amount} the balances collection in this node
self.proposed_id = 1 # for proposed id from num 1 start
self.priority_queue = [Message] # priority_queue for the message
self.my_id = my_id # my node id
self.group_socket_node = collections.defaultdict(int)
self.err_node = {}
self.start_time = -1
self.lock = threading.Lock()
self.transactions = collections.defaultdict(list)
self.received_messages = set() # 格式 transaction-priority-node_num-is_final_msg-transaction_time 组成的string
def update_priority(self, observed_priority=None):
result = 0
self.lock.acquire()
if not observed_priority:
result = self.proposed_id
proposed_id = self.proposed_id + 1
else:
self.proposed_id = max(self.proposed_id, observed_priority + 1)
self.lock.release()
return result
def deliver(self_node: SelfNode): # 需要一个线程一直用他
while True:
while len(self_node.priority_queue) != 0:
deliverable_msg = None
self_node.lock.acquire()
if self_node.priority_queue[0].is_final_msg: # 有可能会错 isfinalmsg 不知道起不起作用
deliverable_msg = self_node.priority_queue[0]
self_node.priority_queue.pop(0)
# 不用进行再次排序,因为能进入deliver 说明都是已经sort过了,因为没次对于queue的添加和修改priority 都会重新对queue 进行排序
self_node.lock.release()
if not (deliverable_msg is None):
calculate_transaction(deliverable_msg, self_node)
def calculate_transaction(msg: Message, self_node: SelfNode):
present_transaction = msg.transaction
if present_transaction.type == "DEPOSIT":
if present_transaction.account in self_node.balance:
self_node.balance[present_transaction.account] += present_transaction.fund
else:
self_node.balance[present_transaction.account] = present_transaction.fund
else: # transfer
if self_node.balance[present_transaction.from_account] >= present_transaction.fund:
self_node.balance[present_transaction.from_account] -= present_transaction.fund
self_node.balance[present_transaction.to_account] += present_transaction.fund
print_balance(self_node)
def print_balance(self_node: SelfNode):
print('BALANCES', end=' ')
print(' '.join('{}:{}'.format(a, b) for a, b in sorted(self_node.balance.items())))
def connect_with_all_nodes(nodes: dict, current_node_id: str, return_values: dict): def connect_with_all_nodes(nodes: dict, current_node_id: str, return_values: dict):
nodes.pop(current_node_id, None) nodes.pop(current_node_id, None)
connect_info = {} connect_info = {}
...@@ -26,7 +91,7 @@ def connect_with_all_nodes(nodes: dict, current_node_id: str, return_values: dic ...@@ -26,7 +91,7 @@ def connect_with_all_nodes(nodes: dict, current_node_id: str, return_values: dic
s.connect(address) s.connect(address)
connect_info[node_id] = s connect_info[node_id] = s
print(CGREEN + f"[Connect] - " print(CGREEN + f"[Connect] - "
f"Connecting to {node[NODE_IDENTIFIER]}: {host_ip_address}:{node[NODE_PORT]} success!" + CEND) f"Connecting to {node[NODE_IDENTIFIER]}: {host_ip_address}:{node[NODE_PORT]} success!" + CEND)
except Exception: except Exception:
print(CRED + f"[Connect] - Connection to " print(CRED + f"[Connect] - Connection to "
f"{node[NODE_IDENTIFIER]}: {host_ip_address}:{node[NODE_PORT]} failed. " f"{node[NODE_IDENTIFIER]}: {host_ip_address}:{node[NODE_PORT]} failed. "
...@@ -61,10 +126,10 @@ def bind_with_all_nodes(nodes: dict, current_node_id: str, return_values: dict): ...@@ -61,10 +126,10 @@ def bind_with_all_nodes(nodes: dict, current_node_id: str, return_values: dict):
def establish_bidirectional_connection(nodes, current_node_id): def establish_bidirectional_connection(nodes, current_node_id):
return_values = {} return_values = {}
thread1 = threading.Thread(target=connect_with_all_nodes, args=[copy.deepcopy(nodes), current_node_id, return_values]) thread1 = threading.Thread(target=connect_with_all_nodes,
args=[copy.deepcopy(nodes), current_node_id, return_values])
thread2 = threading.Thread(target=bind_with_all_nodes, args=[copy.deepcopy(nodes), current_node_id, return_values]) thread2 = threading.Thread(target=bind_with_all_nodes, args=[copy.deepcopy(nodes), current_node_id, return_values])
thread1.start() thread1.start()
...@@ -81,11 +146,112 @@ def map_hostname_to_ip(nodes): ...@@ -81,11 +146,112 @@ def map_hostname_to_ip(nodes):
IP_TO_NODE_ID_MAPPING[NODE_ID_TO_IP_MAPPING[node_id]] = node_id IP_TO_NODE_ID_MAPPING[NODE_ID_TO_IP_MAPPING[node_id]] = node_id
def msg_to_msg_id(msg: Message):
msg_id = msg.transaction.transaction_id + "-" + msg.priority + "-" + str(msg.is_final_msg)
return msg_id
def receive(self_node: SelfNode, msg: Message):
# 这里逻辑我不知道怎么写,因为觉得需要线程一直监听着
# receive 到msg之后,
msg_id = msg_to_msg_id(msg)
if msg_id in self_node.received_messages:
pass
# 已经收到过这条msg,不用再广播了,并且pass?
else:
# 需要进行广播
# 广播!没写
# 进行处理
self_node.received_messages.put(msg_id) # 放入received_messages
if msg.is_final_msg:
# 带着argeement的,更新在priority queue 中的位置
self_node.lock.acquire()
for temp_msg in self_node.priority_queue:
if temp_msg.transaction.transaction_id == msg.transaction.transaction_id:
temp_msg.is_final_msg = True
temp_msg.node_num = msg.node_num
temp_msg.priority = msg.priority
self_node.priority_queue.sort()
self_node.lock.release()
else:
# msg带的自己的transaction,那就是别的node带着propose priority来了
if msg.transaction.transaction_id in self_node.transactions.keys():
# 更新transaction收到的propose 的 node list的
self_node.transactions[msg.transaction.transaction_id].append(msg.node_num)
# 没写 这里 收到的propose 的 node list 是否和 现有的连接的node一样的
agree = False # 暂且设为没有全
if agree:
self_node.lock.acquire()
for temp_msg in self_node.priority_queue:
if temp_msg.transaction.transaction_id == msg.transaction.transaction_id:
temp_msg.is_final_msg = True
if float(temp_msg.priority) < float(msg.priority):
temp_msg.priority = msg.priority
self_node.priority_queue.sort()
self_node.lock.release()
else:
self_node.lock.acquire()
for temp_msg in self_node.priority_queue:
if temp_msg.transaction.transaction_id == msg.transaction.transaction_id:
temp_msg.is_final_msg = False
if float(temp_msg.priority) < float(msg.priority):
temp_msg.priority = msg.priority
self_node.priority_queue.sort()
self_node.lock.release()
def multicast(self_node: SelfNode, msg: Message):
# first put into the received message set
msg_str = msg_to_msg_id(msg)
# 将要发出的message 放到已接收消息的set 中
self_node.received_messages(msg)
# 对group 中的点进行广播
for each_socket in self_node.group_socket_node.values():
each_socket.send(msg.encode(ENCODING_FORMAT)) # ??这里需要广播我不会了
def read_gentx(self_node: SelfNode):
while True:
data = input()
# print('!!!!'+data)
temp_transaction = Transaction(data, self_node.my_id)
priority = self_node.update_priority() # 给priority id 有问题的,我还要再想想
msg = Message(temp_transaction, priority, self_node.my_id,
False) # generate a object could be put in the priority queue
# 要进行一个广播 但是个还没有写 ??
muticast()
self_node.lock.acquire()
self_node.priority_queue.append(msg) # put the msg into priority_queue
self_node.priority_queue.sort()
self_node.lock.release()
self_node.transactions[temp_transaction.transaction_id].append(self_node.my_id) # 放到transaction 中的列表
# while not self_node.priority_queue.empty(): # when the priority queue not empty
# temp = self_node.priority_queue.get()
# if self_node.priority_queue.get().is_final_msg:
# deliver(self_node.priority_queue.get()) #deliver 还没写呢
# else:
# self_node.priority_queue.put(temp)
# break;
if __name__ == '__main__': if __name__ == '__main__':
current_node_id = sys.argv[1] current_node_id = sys.argv[1]
config_path = sys.argv[2] config_path = sys.argv[2]
nodes = parse_config(config_path) nodes = parse_config(config_path)
map_hostname_to_ip(nodes) map_hostname_to_ip(nodes)
self_node = SelfNode(current_node_id)
return_values = establish_bidirectional_connection(nodes, current_node_id)
# 将group 进行一个初始化
for key, value in return_values.items():
for node, sok in value.items():
self_node.group_socket_node[node] = sok
thread_deliver = threading.Thread(target=deliver(self_node))
''' '''
return_values = { return_values = {
...@@ -105,5 +271,5 @@ if __name__ == '__main__': ...@@ -105,5 +271,5 @@ if __name__ == '__main__':
}, },
} }
''' '''
return_values = establish_bidirectional_connection(nodes, current_node_id)
# print(return_values) # print(return_values)
class Message:
def __init__(self, priority: int, node_num: int):
self.priority = str.format(f"{priority}.{node_num}")
def __lt__(self, other):
temp1 = float(self.priority)
temp2 = float(other.priority)
return temp1<temp2
if __name__ == "__main__":
msg1 = Message(2,1)
msg3 = Message(3,2)
msg2 = Message(2, 3)
temp = [msg1,msg2,msg3]
temp.sort()
for msg in temp:
print(msg.priority)
\ No newline at end of file
import time
from constants import DEPOSIT, TRANSFER from constants import DEPOSIT, TRANSFER
class Transaction: class Transaction:
def __init__(self, transaction_str: str): def __init__(self, transaction_str: str,my_id:int):
self.transaction_origin = my_id
self.transaction_str = transaction_str self.transaction_str = transaction_str
self.transaction_time = time.time()
self.transaction_id = str(my_id)+transaction_str + "_" + self.transaction_time
str_segments = transaction_str.split(" ") str_segments = transaction_str.split(" ")
if transaction_str.startswith(DEPOSIT): if transaction_str.startswith(DEPOSIT):
......
import collections
from constants import NODE_IDENTIFIER, NODE_ADDRESS, NODE_PORT, CEND, CRED from constants import NODE_IDENTIFIER, NODE_ADDRESS, NODE_PORT, CEND, CRED
from multiprocessing.pool import ThreadPool from multiprocessing.pool import ThreadPool
import multiprocessing as mp import multiprocessing as mp
import traceback import traceback
from transaction import Transaction
MAX_THREAD_COUNT = mp.cpu_count() MAX_THREAD_COUNT = mp.cpu_count()
THREAD_POOL = ThreadPool(MAX_THREAD_COUNT) THREAD_POOL = ThreadPool(MAX_THREAD_COUNT)
...@@ -38,3 +41,12 @@ def run_in_threads(task_list, **kwargs): ...@@ -38,3 +41,12 @@ def run_in_threads(task_list, **kwargs):
.format(e.__class__.__name__, e, traceback.format_exc()) + CEND) .format(e.__class__.__name__, e, traceback.format_exc()) + CEND)
results.append(value) results.append(value)
return results return results
def deliverable(transaction: Transaction, transactions_proposednode: collections.defaultdict, group: dict):
transaction_id = transaction.transaction_str + "!" + transaction.transaction_time
nodes_list = transactions_proposednode[transaction_id]
for node in group:
if node not in nodes_list:
return False
return True
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment