From 601554387d97938c34d8ed10f9a2d2812e7fa7ac Mon Sep 17 00:00:00 2001 From: owenw2 <owenw2@sp25-cs425-0601.cs.illinois.edu> Date: Sat, 15 Mar 2025 09:12:59 -0500 Subject: [PATCH] transactioning FIZED BABY YURRRRR --- mp1/node.py | 92 +++++++++++++++++++++++------------------------------ 1 file changed, 39 insertions(+), 53 deletions(-) diff --git a/mp1/node.py b/mp1/node.py index 4440daa..ac8e4a6 100644 --- a/mp1/node.py +++ b/mp1/node.py @@ -102,7 +102,7 @@ def await_message(client: socket.socket) -> None: def handle_message(message): - global NUM_RELATIONS # Ensure it is properly referenced + global NUM_RELATIONS print("Received message:", message) with mutex_lock: @@ -128,22 +128,6 @@ def handle_message(message): print(f"Sending return message to {process_name}") connections[process_name].sendall(message_str.encode("utf-8")) - with mutex_lock: - temp_queue = [] - found = False # Track if we updated the message - while not pq.empty(): - item = pq.get() - if item.process == message["process"] and item.id == message["id"]: - if not found: # Ensure update happens only once - item.timestamp = message["timestamp"] - item.type = "final" - item.deliverable = True - found = True - temp_queue.append(item) - for item in temp_queue: - pq.put(item) - - handle_transaction(message["transaction"]) else: print("Error: Connection not found for process", message["process"]) @@ -153,7 +137,7 @@ def handle_message(message): received_count = len(received[key]) if received_count == NUM_RELATIONS: - message["timestamp"] = float(max(float(msg["timestamp"]) for msg in received[key])) + message["timestamp"] = int(max(int(msg["timestamp"]) for msg in received[key])) message["type"] = "final" message_str = json.dumps({"message": message}) @@ -164,75 +148,77 @@ def handle_message(message): with mutex_lock: temp_queue = [] - found = False # Track if we updated the message while not pq.empty(): item = pq.get() if item.process == message["process"] and item.id == message["id"]: - if not found: # Ensure update happens only once - item.timestamp = message["timestamp"] - item.type = "final" - item.deliverable = True # Ensure deliverable is set - found = True + item.timestamp = message["timestamp"] + item.type = "final" + item.deliverable = True temp_queue.append(item) for item in temp_queue: pq.put(item) - - + + handle_transaction() elif message["type"] == "final": with mutex_lock: temp_queue = [] - found = False while not pq.empty(): item = pq.get() if item.process == message["process"] and item.id == message["id"]: - if not found: # Update only the first matching message - item.timestamp = message["timestamp"] - item.type = "final" - item.deliverable = True - found = True + item.timestamp = message["timestamp"] + item.type = "final" + item.deliverable = True temp_queue.append(item) for item in temp_queue: pq.put(item) - handle_transaction(message["transaction"]) + handle_transaction() else: print("Invalid message type", message) -# handles valid transactions in priority queue -def handle_transaction(transaction): +def handle_transaction(): + executed_transactions = set() # Track already executed transactions + while not pq.empty(): - front = pq.queue[0] - if front.to_dict()["deliverable"] == True: - print(transaction) + front = pq.queue[0] # Peek at the first message + + # Check if it's already executed + unique_key = (front.transaction, front.process, front.id) + if unique_key in executed_transactions: + pq.get() # Remove the duplicate transaction + continue # Skip processing + + if front.deliverable: + transaction = front.transaction + print(transaction) # Print transaction execution + + # Process transaction items = transaction.split() if items[0] == "DEPOSIT": - if items[1] in ACCOUNTS.keys(): - ACCOUNTS[items[1]] += int(items[2]) - else: - ACCOUNTS[items[1]] = int(items[2]) + ACCOUNTS[items[1]] = ACCOUNTS.get(items[1], 0) + int(items[2]) + elif items[0] == "TRANSFER": - if ACCOUNTS[items[1]] - int(items[4]) < 0: - pass - else: + if ACCOUNTS.get(items[1], 0) >= int(items[4]): ACCOUNTS[items[1]] -= int(items[4]) - if items[3] in ACCOUNTS.keys(): - ACCOUNTS[items[3]] += int(items[4]) - else: - ACCOUNTS[items[3]] = int(items[4]) - - # print balances + ACCOUNTS[items[3]] = ACCOUNTS.get(items[3], 0) + int(items[4]) + + # Print updated balances non_zero_accounts = {k: v for k, v in ACCOUNTS.items() if v != 0} sorted_accounts = sorted(non_zero_accounts.items()) balances_str = " ".join(f"{key}:{value}" for key, value in sorted_accounts) print(f"BALANCES {balances_str}") - pq.get() + + executed_transactions.add(unique_key) # Mark as executed + pq.get() # Remove from queue after execution + else: - break + break # Stop processing if the front message is not deliverable + if __name__ == "__main__": -- GitLab