diff --git a/mp3/Client/client.go b/mp3/Client/client.go index 2a892ce8abf58e7ea2536b427c0c3a8a538828c3..51c0863f7da204e0c70bc1d80d4dfb9c456a10ad 100644 --- a/mp3/Client/client.go +++ b/mp3/Client/client.go @@ -18,9 +18,11 @@ type Client struct { Connected bool // true if established connection Servers []string // server ip addresses and ports ResponseChan chan Response // channel for responses from server + Encoder *gob.Encoder } type Request struct { + ClientName string Transaction string // type of transaction Branch string Account string @@ -29,7 +31,7 @@ type Request struct { type Response struct { Result bool - Message bool + Message string } func (client *Client) run_server() { @@ -38,7 +40,6 @@ func (client *Client) run_server() { response := Response{} dec := gob.NewDecoder(client.Connection) dec.Decode(&response) - fmt.Println("Received result", response.Result, response.Message) client.ResponseChan <- response } } @@ -47,12 +48,11 @@ func (client *Client) connect() { request := Request{} response := Response{} + request.ClientName = client.Name request.Transaction = "BEGIN" request.Branch = client.Name request.Account = client.Name - fmt.Println("Attempting connections") - for _, server := range client.Servers { fmt.Println("Sending connection request to", server) connection, err := net.Dial("tcp", server) @@ -62,8 +62,8 @@ func (client *Client) connect() { } // send begin request - enc := gob.NewEncoder(connection) - enc.Encode(&request) + client.Encoder = gob.NewEncoder(connection) + client.Encoder.Encode(&request) // receive connection response dec := gob.NewDecoder(connection) dec.Decode(&response) @@ -81,6 +81,7 @@ func (client *Client) connect() { func (client *Client) send_request(transaction string, account string, amount int) { request := Request{} + request.ClientName = client.Name request.Transaction = transaction request.Amount = amount @@ -93,8 +94,7 @@ func (client *Client) send_request(transaction string, account string, amount in request.Branch = substrings[0] } - enc := gob.NewEncoder(client.Connection) - enc.Encode(&request) + client.Encoder.Encode(&request) } func (client *Client) read_file(config_name string) { @@ -110,7 +110,7 @@ func (client *Client) read_file(config_name string) { for i := 0; i < 1; i++ { text, serr := reader.ReadString('\n') substrings := strings.Split(text, " ") - if serr != nil && i < 4 { + if serr != nil && i < 0 { fmt.Println("failed to read config") } // remove new line char from port @@ -131,14 +131,14 @@ func main() { ResponseChan: make(chan Response), } - clientName := os.Args[0] + clientName := os.Args[1] configPath := os.Args[2] // read config and attach server:port client.Name = clientName client.read_file(configPath) - fmt.Println("Client State:", client.Name, client.Timestamp, client.Connection, client.Connected, client.Servers, len(client.Servers)) + fmt.Println("Client State After Config:", client.Name, client.Connection, client.Connected, client.Servers, len(client.Servers)) reader := bufio.NewReader(os.Stdin) @@ -162,13 +162,12 @@ func main() { continue } // setup server connection - fmt.Println("HERE") go client.connect() response := <-client.ResponseChan if response.Result { client.Connected = true client.Timestamp = time.Now() - + fmt.Println("OK") } else { fmt.Println("ABORTED") return diff --git a/mp3/Server/server.go b/mp3/Server/server.go index 7b354e3f6f9d861a8b28609970dd0ec368e1ed09..55c97ca92d41364eab608b74811ad2afe41b7390 100644 --- a/mp3/Server/server.go +++ b/mp3/Server/server.go @@ -16,8 +16,9 @@ type Server struct { Address string BranchNameToAddress map[string]string Accounts map[string]*Account - Peers map[string]net.Conn // Connected peers - PeerLock sync.Mutex // Lock for peer map + Clients map[string]net.Conn // connected clients + Peers map[string]net.Conn // connected peers + Lock sync.Mutex // lock for peer map ClientChan chan Request // PeerChan chan PeerReque @@ -40,6 +41,7 @@ type Account struct { } type Request struct { + ClientName string Transaction string // type of transaction Branch string Account string @@ -48,24 +50,65 @@ type Request struct { type Response struct { Result bool - Message bool + Message string } -func (server *Server) handle_transaction() { - -} +// func (server *Server) run_server() { +// // request := <-server.ClientChan +// // fmt.Println("Got client message:", request.Transaction, request.Branch, request.Account, request.Amount) +// // enc := gob.NewEncoder(clientConn) +// // response := Response{Result: true, Message: "OK"} +// // enc.Encode(&response) +// } + +// func (server *Server) handle_peer_messages(peerConn net.Conn) { +// request := Request{} +// dec := gob.NewDecoder(peerConn) + +// for { +// dec.Decode(&request) +// server.PeerCa <- request +// } +// } + +func (server *Server) handle_client_messages(clientConn net.Conn) { + fmt.Println("Starting client handler thread") + dec := gob.NewDecoder(clientConn) -func (server *Server) handle_client(clientConn net.Conn) { request := Request{} - - dec := gob.NewDecoder(clientConn) dec.Decode(&request) + // add to map + server.Lock.Lock() + server.Clients[request.ClientName] = clientConn + server.Lock.Unlock() + + for { + request := Request{} + err := dec.Decode(&request) + if err != nil { + fmt.Println("Decode error:", err) + return + } + // fmt.Println("Placing", request.Transaction, "from", request.ClientName) + server.ClientChan <- request + } +} + +// function that represents coordinator thread +func (server *Server) run_coordinator() { + fmt.Println("Starting coordinator server") + for { + request := <-server.ClientChan + fmt.Println("Got client message:", request.Transaction, request.Branch, request.Account, request.Amount) + enc := gob.NewEncoder(server.Clients[request.ClientName]) - fmt.Println("Got client message") + response := Response{Result: true, Message: "OK"} + enc.Encode(&response) + } } -// listens to tcp netConn from client and then handles (this is only necessary if it is a coordinator server) -func (server *Server) listen_to_clients() { +// listens for client connection and launches thread to handle client requests +func (server *Server) connect_to_clients() { input, err := net.Listen("tcp", server.Address) if err != nil { fmt.Println("Listening on server failed") @@ -73,13 +116,18 @@ func (server *Server) listen_to_clients() { defer input.Close() for { - conn, err := input.Accept() + connection, err := input.Accept() if err != nil { fmt.Println("Error accepting client connection", err) continue } - fmt.Println("Received connection", conn) - go server.handle_client(conn) + // confirm connection with client + response := Response{Result: true, Message: "OK"} + enc := gob.NewEncoder(connection) + enc.Encode(&response) + + // starts channel fill thread + go server.handle_client_messages(connection) } } @@ -96,9 +144,9 @@ func (server *Server) connect_to_peers() { } time.Sleep(2 * time.Second) } - server.PeerLock.Lock() + server.Lock.Lock() server.Peers[peerAddr] = conn - server.PeerLock.Unlock() + server.Lock.Unlock() fmt.Println("Connected to peer", peerAddr) // Handle peer communication here if needed }(address) @@ -117,7 +165,7 @@ func (server *Server) read_file(config_name string) { // assumption that only ever have 5 servers (no more, no less) for i := 0; i < 1; i++ { text, serr := reader.ReadString('\n') - if serr != nil && i < 4 { + if serr != nil && i < 0 { fmt.Println("failed to read config", serr) } substrings := strings.Split(text, " ") @@ -141,26 +189,24 @@ func main() { // init server server := Server{} - branchName := os.Args[0] + branchName := os.Args[1] configPath := os.Args[2] server.BranchName = branchName server.BranchNameToAddress = make(map[string]string) server.Accounts = make(map[string]*Account) + server.Clients = make(map[string]net.Conn) server.Peers = make(map[string]net.Conn) + server.ClientChan = make(chan Request, 1000) server.read_file(configPath) fmt.Println("Server State:", server.BranchName, server.Address, server.BranchNameToAddress, server.Accounts, server.Peers) - go server.listen_to_clients() + go server.run_coordinator() + go server.connect_to_clients() // go server.connect_to_peers() for { continue } - - // timstamped concurrency - // init RTS and TW lists (ordered by timestamp) - //=\ - // Strict Timestamp Ordering }