utils.py 1.17 KiB
import socket
import json
import threading
import queue
import sys
import time
from typing import Dict
import globals
from message import Message
def b_multicast(nodes: Dict[str, socket.socket], q: queue.Queue, lock: threading.Lock, process: int, pq: queue.PriorityQueue):
while True:
if q.qsize() == 0:
continue
message = q.get()
with lock:
globals.TIMESTAMP += 1
timestamp = globals.TIMESTAMP
cur_time = time.time()
# send message to each socket
for indentifier, node_socket in nodes.items():
try:
data = {"message": Message(message, timestamp, process, "first", False, cur_time).to_dict()}
data_str = json.dumps(data)
node_socket.send(data_str.encode("utf-8"))
# pq.put(data_str) # add to priority queue
pq.put(Message(message, timestamp, process, "first", False, cur_time)) # add to priority queue
# print(f"Sent message to {indentifier} {message}")
except Exception as e:
print(f"Error sending message to {indentifier}: {e}")
break
def read_stdin(stdin_queue: queue.Queue):
for line in sys.stdin:
stdin_queue.put(line.strip())