diff --git a/mp3/Client/client.go b/mp3/Client/client.go index 310cd8e16dc05142237ef64bd32c92139bd20eaf..ca6fc41f258770be0ef09d26b40454be37f95102 100644 --- a/mp3/Client/client.go +++ b/mp3/Client/client.go @@ -35,7 +35,7 @@ type Response struct { Message string } -var NUM_SERVERS int = 5 // number of servers +var NUM_SERVERS int = 1 // number of servers // fills client response channel func (client *Client) run_server() { diff --git a/mp3/Server/server.go b/mp3/Server/server.go index 55c97ca92d41364eab608b74811ad2afe41b7390..9ee1944b3a4124a0be219bda62d1d472d5dba2e1 100644 --- a/mp3/Server/server.go +++ b/mp3/Server/server.go @@ -6,6 +6,7 @@ import ( "fmt" "net" "os" + "strconv" "strings" "sync" "time" @@ -16,14 +17,16 @@ type Server struct { Address string BranchNameToAddress map[string]string Accounts map[string]*Account - Clients map[string]net.Conn // connected clients - Peers map[string]net.Conn // connected peers - Lock sync.Mutex // lock for peer map + Clients map[string]net.Conn // connected clients [name, socket] + ClientEncs map[string]*gob.Encoder // connected client encoders + Peers map[string]net.Conn // connected peers [address, socket] + PeerEncs map[string]*gob.Encoder // connected peer encoders + Lock sync.Mutex // lock for client and peer map + PeerConnectedWG sync.WaitGroup // wait group for all peers to connect ClientChan chan Request - // PeerChan chan PeerReque + PeerChan chan Request - // ClientListener net.Listener - // PeerListener net.Listener + TransactionTimestamps map[string]time.Time // timestamp for all the transactions } type WriteEntry struct { @@ -32,16 +35,20 @@ type WriteEntry struct { } type Account struct { - Name string - Balance int - CreatedBy string - RTS []string + Name string + Balance int + CreatedBy string + RTS []time.Time + TW map[string]*WriteEntry // tentative writes WriteTimestampCommitted []time.Time + Committed bool + AccountLock sync.Mutex } type Request struct { ClientName string + PeerName string Transaction string // type of transaction Branch string Account string @@ -53,28 +60,52 @@ type Response struct { Message string } -// func (server *Server) run_server() { -// // request := <-server.ClientChan -// // fmt.Println("Got client message:", request.Transaction, request.Branch, request.Account, request.Amount) -// // enc := gob.NewEncoder(clientConn) -// // response := Response{Result: true, Message: "OK"} -// // enc.Encode(&response) -// } +var NUM_SERVERS int = 2 // number of servers + +////////////////////////////// +// CLIENT FUNCTIONS +////////////////////////////// -// func (server *Server) handle_peer_messages(peerConn net.Conn) { -// request := Request{} -// dec := gob.NewDecoder(peerConn) +/* + * Does: receives and handles transaction, sends to other servers if needed + * additional logic for committing and aborting + * retrieves from client channel + * Spawns: none + */ +func (server *Server) run_coordinator() { + fmt.Println("Starting coordinator thread") + for { + request := <-server.ClientChan + fmt.Println("Got client message:", request.Transaction, request.Branch, request.Account, request.Amount) -// for { -// dec.Decode(&request) -// server.PeerCa <- request -// } -// } + // send transaction to correct server + if request.Branch == server.BranchName { + // handle transaction + } else { + // request another server to handle transaction + request.PeerName = server.BranchName + server.Lock.Lock() + address := server.BranchNameToAddress[request.Branch] + server.PeerEncs[address].Encode(&request) + server.Lock.Unlock() + } + // to send back to client? + enc := gob.NewEncoder(server.Clients[request.ClientName]) + response := Response{Result: true, Message: "OK"} + enc.Encode(&response) + } +} + +/* + * Does: fills client channel + * Spawns: none + */ func (server *Server) handle_client_messages(clientConn net.Conn) { fmt.Println("Starting client handler thread") dec := gob.NewDecoder(clientConn) + // initial decode used to remove request := Request{} dec.Decode(&request) // add to map @@ -89,31 +120,82 @@ func (server *Server) handle_client_messages(clientConn net.Conn) { fmt.Println("Decode error:", err) return } - // fmt.Println("Placing", request.Transaction, "from", request.ClientName) server.ClientChan <- request } } -// function that represents coordinator thread -func (server *Server) run_coordinator() { - fmt.Println("Starting coordinator server") - for { - request := <-server.ClientChan - fmt.Println("Got client message:", request.Transaction, request.Branch, request.Account, request.Amount) - enc := gob.NewEncoder(server.Clients[request.ClientName]) +// listens for client connection and launches thread to handle client requests +func (server *Server) listen_to_clients() { + ip := strings.Split(server.Address, ":")[0] + port, _ := strconv.Atoi(strings.Split(server.Address, ":")[1]) + address := ip + ":" + strconv.Itoa(port+10) + input, err := net.Listen("tcp", address) + if err != nil { + fmt.Println("Listening on server failed") + } + defer input.Close() + for { + connection, err := input.Accept() + if err != nil { + fmt.Println("Error accepting client connection", err) + continue + } + // confirm connection with client response := Response{Result: true, Message: "OK"} + enc := gob.NewEncoder(connection) enc.Encode(&response) + // starts channel fill thread + go server.handle_client_messages(connection) } } -// listens for client connection and launches thread to handle client requests -func (server *Server) connect_to_clients() { +////////////////////////////// +// SERVER FUNCTIONS +////////////////////////////// + +/* + * Does: receives and handles sent transaction + * Spawns: none + */ +func (server *Server) run_server() { + fmt.Println("Starting peer thread") + for { + request := <-server.PeerChan + fmt.Println("Got peer message:", request.Transaction, request.Branch, request.Account, request.Amount) + + // go server.handle_transaction(request) + } +} + +/* + * Does: fills peer channel + * Spawns: none + */ +func (server *Server) handle_peer_messages(peerConn net.Conn) { + fmt.Println("Starting peer handler thread") + dec := gob.NewDecoder(peerConn) + + for { + request := Request{} + err := dec.Decode(&request) + if err != nil { + fmt.Println("Decode error:", err) + return + } + server.PeerChan <- request + } +} + +/* + * Does: listens for tcp request from peers and accepts connection + * Spawns: multiple threads for each peer connection that fills peer channel + */ +func (server *Server) listen_to_peers() { input, err := net.Listen("tcp", server.Address) if err != nil { fmt.Println("Listening on server failed") } - defer input.Close() for { connection, err := input.Accept() @@ -121,18 +203,19 @@ func (server *Server) connect_to_clients() { fmt.Println("Error accepting client connection", err) continue } - // confirm connection with client - response := Response{Result: true, Message: "OK"} - enc := gob.NewEncoder(connection) - enc.Encode(&response) // starts channel fill thread - go server.handle_client_messages(connection) + go server.handle_peer_messages(connection) } } -// lsiten to tcp from other servers and handle necessary transactions +/* + * Does: Sends tcp request to all other servers in peer list + * Spawns: Nothing + */ func (server *Server) connect_to_peers() { + server.PeerConnectedWG.Add(NUM_SERVERS - 1) + for address := range server.Peers { go func(peerAddr string) { var conn net.Conn @@ -142,17 +225,23 @@ func (server *Server) connect_to_peers() { if err == nil { break } - time.Sleep(2 * time.Second) + time.Sleep(1 * time.Second) } server.Lock.Lock() server.Peers[peerAddr] = conn + server.PeerEncs[address] = gob.NewEncoder(conn) + server.PeerConnectedWG.Done() server.Lock.Unlock() - fmt.Println("Connected to peer", peerAddr) - // Handle peer communication here if needed }(address) } + server.PeerConnectedWG.Wait() + fmt.Println("Server Peers:", server.Peers) } +////////////////////////////// +// HELPER FUNCTIONS +////////////////////////////// + func (server *Server) read_file(config_name string) { config, error := os.Open(config_name) if error != nil { @@ -163,9 +252,9 @@ func (server *Server) read_file(config_name string) { reader := bufio.NewReader(config) // assumption that only ever have 5 servers (no more, no less) - for i := 0; i < 1; i++ { + for i := 0; i < NUM_SERVERS; i++ { text, serr := reader.ReadString('\n') - if serr != nil && i < 0 { + if serr != nil && i < NUM_SERVERS-1 { fmt.Println("failed to read config", serr) } substrings := strings.Split(text, " ") @@ -185,6 +274,24 @@ func (server *Server) read_file(config_name string) { config.Close() } +func (server *Server) print_all_balances() { + server.Lock.Lock() + for _, account := range server.Accounts { + if account.Committed && account.Balance > 0 { + fmt.Println(server.BranchName+"."+account.Name, "=", account.Balance) + } + } + server.Lock.Unlock() +} + +////////////////////////////// +// TIMESTAMP ORDERING AND TRANSACTION HANDLING START +////////////////////////////// + +////////////////////////////// +// START OF PROGRAM +////////////////////////////// + func main() { // init server server := Server{} @@ -196,17 +303,19 @@ func main() { server.BranchNameToAddress = make(map[string]string) server.Accounts = make(map[string]*Account) server.Clients = make(map[string]net.Conn) + server.ClientEncs = make(map[string]*gob.Encoder) server.Peers = make(map[string]net.Conn) + server.PeerEncs = make(map[string]*gob.Encoder) server.ClientChan = make(chan Request, 1000) + server.PeerChan = make(chan Request, 1000) server.read_file(configPath) - fmt.Println("Server State:", server.BranchName, server.Address, server.BranchNameToAddress, server.Accounts, server.Peers) - go server.run_coordinator() - go server.connect_to_clients() - // go server.connect_to_peers() + go server.listen_to_clients() - for { - continue - } + go server.run_server() + go server.listen_to_peers() // potential race condition with connect + go server.connect_to_peers() + + select {} } diff --git a/mp3/client b/mp3/client deleted file mode 100755 index f781997471f0e2377f37798542f1e3e0c0aa0639..0000000000000000000000000000000000000000 Binary files a/mp3/client and /dev/null differ diff --git a/mp3/config_test.txt b/mp3/config_test.txt index a3b8b36bfc6d5c124ac6fbbaba92d019d90d7fed..b940ff0626cedb6cea86636e484fabe6ea6d85b3 100644 --- a/mp3/config_test.txt +++ b/mp3/config_test.txt @@ -1,5 +1,2 @@ A 127.0.0.1 2005 -B 127.0.0.1 2006 -C 127.0.0.1 2007 -D 127.0.0.1 2008 -E 127.0.0.1 2009 \ No newline at end of file +B 127.0.0.1 2006 \ No newline at end of file