Skip to content
Snippets Groups Projects
Commit 60155438 authored by owenw2's avatar owenw2
Browse files

transactioning FIZED BABY YURRRRR

parent 5334d105
No related branches found
No related tags found
No related merge requests found
...@@ -102,7 +102,7 @@ def await_message(client: socket.socket) -> None: ...@@ -102,7 +102,7 @@ def await_message(client: socket.socket) -> None:
def handle_message(message): def handle_message(message):
global NUM_RELATIONS # Ensure it is properly referenced global NUM_RELATIONS
print("Received message:", message) print("Received message:", message)
with mutex_lock: with mutex_lock:
...@@ -128,22 +128,6 @@ def handle_message(message): ...@@ -128,22 +128,6 @@ def handle_message(message):
print(f"Sending return message to {process_name}") print(f"Sending return message to {process_name}")
connections[process_name].sendall(message_str.encode("utf-8")) connections[process_name].sendall(message_str.encode("utf-8"))
with mutex_lock:
temp_queue = []
found = False # Track if we updated the message
while not pq.empty():
item = pq.get()
if item.process == message["process"] and item.id == message["id"]:
if not found: # Ensure update happens only once
item.timestamp = message["timestamp"]
item.type = "final"
item.deliverable = True
found = True
temp_queue.append(item)
for item in temp_queue:
pq.put(item)
handle_transaction(message["transaction"])
else: else:
print("Error: Connection not found for process", message["process"]) print("Error: Connection not found for process", message["process"])
...@@ -153,7 +137,7 @@ def handle_message(message): ...@@ -153,7 +137,7 @@ def handle_message(message):
received_count = len(received[key]) received_count = len(received[key])
if received_count == NUM_RELATIONS: if received_count == NUM_RELATIONS:
message["timestamp"] = float(max(float(msg["timestamp"]) for msg in received[key])) message["timestamp"] = int(max(int(msg["timestamp"]) for msg in received[key]))
message["type"] = "final" message["type"] = "final"
message_str = json.dumps({"message": message}) message_str = json.dumps({"message": message})
...@@ -164,75 +148,77 @@ def handle_message(message): ...@@ -164,75 +148,77 @@ def handle_message(message):
with mutex_lock: with mutex_lock:
temp_queue = [] temp_queue = []
found = False # Track if we updated the message
while not pq.empty(): while not pq.empty():
item = pq.get() item = pq.get()
if item.process == message["process"] and item.id == message["id"]: if item.process == message["process"] and item.id == message["id"]:
if not found: # Ensure update happens only once item.timestamp = message["timestamp"]
item.timestamp = message["timestamp"] item.type = "final"
item.type = "final" item.deliverable = True
item.deliverable = True # Ensure deliverable is set
found = True
temp_queue.append(item) temp_queue.append(item)
for item in temp_queue: for item in temp_queue:
pq.put(item) pq.put(item)
handle_transaction()
elif message["type"] == "final": elif message["type"] == "final":
with mutex_lock: with mutex_lock:
temp_queue = [] temp_queue = []
found = False
while not pq.empty(): while not pq.empty():
item = pq.get() item = pq.get()
if item.process == message["process"] and item.id == message["id"]: if item.process == message["process"] and item.id == message["id"]:
if not found: # Update only the first matching message item.timestamp = message["timestamp"]
item.timestamp = message["timestamp"] item.type = "final"
item.type = "final" item.deliverable = True
item.deliverable = True
found = True
temp_queue.append(item) temp_queue.append(item)
for item in temp_queue: for item in temp_queue:
pq.put(item) pq.put(item)
handle_transaction(message["transaction"]) handle_transaction()
else: else:
print("Invalid message type", message) print("Invalid message type", message)
# handles valid transactions in priority queue def handle_transaction():
def handle_transaction(transaction): executed_transactions = set() # Track already executed transactions
while not pq.empty(): while not pq.empty():
front = pq.queue[0] front = pq.queue[0] # Peek at the first message
if front.to_dict()["deliverable"] == True:
print(transaction) # Check if it's already executed
unique_key = (front.transaction, front.process, front.id)
if unique_key in executed_transactions:
pq.get() # Remove the duplicate transaction
continue # Skip processing
if front.deliverable:
transaction = front.transaction
print(transaction) # Print transaction execution
# Process transaction
items = transaction.split() items = transaction.split()
if items[0] == "DEPOSIT": if items[0] == "DEPOSIT":
if items[1] in ACCOUNTS.keys(): ACCOUNTS[items[1]] = ACCOUNTS.get(items[1], 0) + int(items[2])
ACCOUNTS[items[1]] += int(items[2])
else:
ACCOUNTS[items[1]] = int(items[2])
elif items[0] == "TRANSFER": elif items[0] == "TRANSFER":
if ACCOUNTS[items[1]] - int(items[4]) < 0: if ACCOUNTS.get(items[1], 0) >= int(items[4]):
pass
else:
ACCOUNTS[items[1]] -= int(items[4]) ACCOUNTS[items[1]] -= int(items[4])
if items[3] in ACCOUNTS.keys(): ACCOUNTS[items[3]] = ACCOUNTS.get(items[3], 0) + int(items[4])
ACCOUNTS[items[3]] += int(items[4])
else: # Print updated balances
ACCOUNTS[items[3]] = int(items[4])
# print balances
non_zero_accounts = {k: v for k, v in ACCOUNTS.items() if v != 0} non_zero_accounts = {k: v for k, v in ACCOUNTS.items() if v != 0}
sorted_accounts = sorted(non_zero_accounts.items()) sorted_accounts = sorted(non_zero_accounts.items())
balances_str = " ".join(f"{key}:{value}" for key, value in sorted_accounts) balances_str = " ".join(f"{key}:{value}" for key, value in sorted_accounts)
print(f"BALANCES {balances_str}") print(f"BALANCES {balances_str}")
pq.get()
executed_transactions.add(unique_key) # Mark as executed
pq.get() # Remove from queue after execution
else: else:
break break # Stop processing if the front message is not deliverable
if __name__ == "__main__": if __name__ == "__main__":
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment