Skip to content
Snippets Groups Projects
Commit 522da8d3 authored by owenw2's avatar owenw2
Browse files

resolved client-server messaging

parent feaf98f2
No related branches found
No related tags found
No related merge requests found
...@@ -18,9 +18,11 @@ type Client struct { ...@@ -18,9 +18,11 @@ type Client struct {
Connected bool // true if established connection Connected bool // true if established connection
Servers []string // server ip addresses and ports Servers []string // server ip addresses and ports
ResponseChan chan Response // channel for responses from server ResponseChan chan Response // channel for responses from server
Encoder *gob.Encoder
} }
type Request struct { type Request struct {
ClientName string
Transaction string // type of transaction Transaction string // type of transaction
Branch string Branch string
Account string Account string
...@@ -29,7 +31,7 @@ type Request struct { ...@@ -29,7 +31,7 @@ type Request struct {
type Response struct { type Response struct {
Result bool Result bool
Message bool Message string
} }
func (client *Client) run_server() { func (client *Client) run_server() {
...@@ -38,7 +40,6 @@ func (client *Client) run_server() { ...@@ -38,7 +40,6 @@ func (client *Client) run_server() {
response := Response{} response := Response{}
dec := gob.NewDecoder(client.Connection) dec := gob.NewDecoder(client.Connection)
dec.Decode(&response) dec.Decode(&response)
fmt.Println("Received result", response.Result, response.Message)
client.ResponseChan <- response client.ResponseChan <- response
} }
} }
...@@ -47,12 +48,11 @@ func (client *Client) connect() { ...@@ -47,12 +48,11 @@ func (client *Client) connect() {
request := Request{} request := Request{}
response := Response{} response := Response{}
request.ClientName = client.Name
request.Transaction = "BEGIN" request.Transaction = "BEGIN"
request.Branch = client.Name request.Branch = client.Name
request.Account = client.Name request.Account = client.Name
fmt.Println("Attempting connections")
for _, server := range client.Servers { for _, server := range client.Servers {
fmt.Println("Sending connection request to", server) fmt.Println("Sending connection request to", server)
connection, err := net.Dial("tcp", server) connection, err := net.Dial("tcp", server)
...@@ -62,8 +62,8 @@ func (client *Client) connect() { ...@@ -62,8 +62,8 @@ func (client *Client) connect() {
} }
// send begin request // send begin request
enc := gob.NewEncoder(connection) client.Encoder = gob.NewEncoder(connection)
enc.Encode(&request) client.Encoder.Encode(&request)
// receive connection response // receive connection response
dec := gob.NewDecoder(connection) dec := gob.NewDecoder(connection)
dec.Decode(&response) dec.Decode(&response)
...@@ -81,6 +81,7 @@ func (client *Client) connect() { ...@@ -81,6 +81,7 @@ func (client *Client) connect() {
func (client *Client) send_request(transaction string, account string, amount int) { func (client *Client) send_request(transaction string, account string, amount int) {
request := Request{} request := Request{}
request.ClientName = client.Name
request.Transaction = transaction request.Transaction = transaction
request.Amount = amount request.Amount = amount
...@@ -93,8 +94,7 @@ func (client *Client) send_request(transaction string, account string, amount in ...@@ -93,8 +94,7 @@ func (client *Client) send_request(transaction string, account string, amount in
request.Branch = substrings[0] request.Branch = substrings[0]
} }
enc := gob.NewEncoder(client.Connection) client.Encoder.Encode(&request)
enc.Encode(&request)
} }
func (client *Client) read_file(config_name string) { func (client *Client) read_file(config_name string) {
...@@ -110,7 +110,7 @@ func (client *Client) read_file(config_name string) { ...@@ -110,7 +110,7 @@ func (client *Client) read_file(config_name string) {
for i := 0; i < 1; i++ { for i := 0; i < 1; i++ {
text, serr := reader.ReadString('\n') text, serr := reader.ReadString('\n')
substrings := strings.Split(text, " ") substrings := strings.Split(text, " ")
if serr != nil && i < 4 { if serr != nil && i < 0 {
fmt.Println("failed to read config") fmt.Println("failed to read config")
} }
// remove new line char from port // remove new line char from port
...@@ -131,14 +131,14 @@ func main() { ...@@ -131,14 +131,14 @@ func main() {
ResponseChan: make(chan Response), ResponseChan: make(chan Response),
} }
clientName := os.Args[0] clientName := os.Args[1]
configPath := os.Args[2] configPath := os.Args[2]
// read config and attach server:port // read config and attach server:port
client.Name = clientName client.Name = clientName
client.read_file(configPath) client.read_file(configPath)
fmt.Println("Client State:", client.Name, client.Timestamp, client.Connection, client.Connected, client.Servers, len(client.Servers)) fmt.Println("Client State After Config:", client.Name, client.Connection, client.Connected, client.Servers, len(client.Servers))
reader := bufio.NewReader(os.Stdin) reader := bufio.NewReader(os.Stdin)
...@@ -162,13 +162,12 @@ func main() { ...@@ -162,13 +162,12 @@ func main() {
continue continue
} }
// setup server connection // setup server connection
fmt.Println("HERE")
go client.connect() go client.connect()
response := <-client.ResponseChan response := <-client.ResponseChan
if response.Result { if response.Result {
client.Connected = true client.Connected = true
client.Timestamp = time.Now() client.Timestamp = time.Now()
fmt.Println("OK")
} else { } else {
fmt.Println("ABORTED") fmt.Println("ABORTED")
return return
......
...@@ -16,8 +16,9 @@ type Server struct { ...@@ -16,8 +16,9 @@ type Server struct {
Address string Address string
BranchNameToAddress map[string]string BranchNameToAddress map[string]string
Accounts map[string]*Account Accounts map[string]*Account
Peers map[string]net.Conn // Connected peers Clients map[string]net.Conn // connected clients
PeerLock sync.Mutex // Lock for peer map Peers map[string]net.Conn // connected peers
Lock sync.Mutex // lock for peer map
ClientChan chan Request ClientChan chan Request
// PeerChan chan PeerReque // PeerChan chan PeerReque
...@@ -40,6 +41,7 @@ type Account struct { ...@@ -40,6 +41,7 @@ type Account struct {
} }
type Request struct { type Request struct {
ClientName string
Transaction string // type of transaction Transaction string // type of transaction
Branch string Branch string
Account string Account string
...@@ -48,24 +50,65 @@ type Request struct { ...@@ -48,24 +50,65 @@ type Request struct {
type Response struct { type Response struct {
Result bool Result bool
Message bool Message string
} }
func (server *Server) handle_transaction() { // func (server *Server) run_server() {
// // request := <-server.ClientChan
} // // fmt.Println("Got client message:", request.Transaction, request.Branch, request.Account, request.Amount)
// // enc := gob.NewEncoder(clientConn)
// // response := Response{Result: true, Message: "OK"}
// // enc.Encode(&response)
// }
// func (server *Server) handle_peer_messages(peerConn net.Conn) {
// request := Request{}
// dec := gob.NewDecoder(peerConn)
// for {
// dec.Decode(&request)
// server.PeerCa <- request
// }
// }
func (server *Server) handle_client_messages(clientConn net.Conn) {
fmt.Println("Starting client handler thread")
dec := gob.NewDecoder(clientConn)
func (server *Server) handle_client(clientConn net.Conn) {
request := Request{} request := Request{}
dec := gob.NewDecoder(clientConn)
dec.Decode(&request) dec.Decode(&request)
// add to map
server.Lock.Lock()
server.Clients[request.ClientName] = clientConn
server.Lock.Unlock()
for {
request := Request{}
err := dec.Decode(&request)
if err != nil {
fmt.Println("Decode error:", err)
return
}
// fmt.Println("Placing", request.Transaction, "from", request.ClientName)
server.ClientChan <- request
}
}
// function that represents coordinator thread
func (server *Server) run_coordinator() {
fmt.Println("Starting coordinator server")
for {
request := <-server.ClientChan
fmt.Println("Got client message:", request.Transaction, request.Branch, request.Account, request.Amount)
enc := gob.NewEncoder(server.Clients[request.ClientName])
fmt.Println("Got client message") response := Response{Result: true, Message: "OK"}
enc.Encode(&response)
}
} }
// listens to tcp netConn from client and then handles (this is only necessary if it is a coordinator server) // listens for client connection and launches thread to handle client requests
func (server *Server) listen_to_clients() { func (server *Server) connect_to_clients() {
input, err := net.Listen("tcp", server.Address) input, err := net.Listen("tcp", server.Address)
if err != nil { if err != nil {
fmt.Println("Listening on server failed") fmt.Println("Listening on server failed")
...@@ -73,13 +116,18 @@ func (server *Server) listen_to_clients() { ...@@ -73,13 +116,18 @@ func (server *Server) listen_to_clients() {
defer input.Close() defer input.Close()
for { for {
conn, err := input.Accept() connection, err := input.Accept()
if err != nil { if err != nil {
fmt.Println("Error accepting client connection", err) fmt.Println("Error accepting client connection", err)
continue continue
} }
fmt.Println("Received connection", conn) // confirm connection with client
go server.handle_client(conn) response := Response{Result: true, Message: "OK"}
enc := gob.NewEncoder(connection)
enc.Encode(&response)
// starts channel fill thread
go server.handle_client_messages(connection)
} }
} }
...@@ -96,9 +144,9 @@ func (server *Server) connect_to_peers() { ...@@ -96,9 +144,9 @@ func (server *Server) connect_to_peers() {
} }
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
} }
server.PeerLock.Lock() server.Lock.Lock()
server.Peers[peerAddr] = conn server.Peers[peerAddr] = conn
server.PeerLock.Unlock() server.Lock.Unlock()
fmt.Println("Connected to peer", peerAddr) fmt.Println("Connected to peer", peerAddr)
// Handle peer communication here if needed // Handle peer communication here if needed
}(address) }(address)
...@@ -117,7 +165,7 @@ func (server *Server) read_file(config_name string) { ...@@ -117,7 +165,7 @@ func (server *Server) read_file(config_name string) {
// assumption that only ever have 5 servers (no more, no less) // assumption that only ever have 5 servers (no more, no less)
for i := 0; i < 1; i++ { for i := 0; i < 1; i++ {
text, serr := reader.ReadString('\n') text, serr := reader.ReadString('\n')
if serr != nil && i < 4 { if serr != nil && i < 0 {
fmt.Println("failed to read config", serr) fmt.Println("failed to read config", serr)
} }
substrings := strings.Split(text, " ") substrings := strings.Split(text, " ")
...@@ -141,26 +189,24 @@ func main() { ...@@ -141,26 +189,24 @@ func main() {
// init server // init server
server := Server{} server := Server{}
branchName := os.Args[0] branchName := os.Args[1]
configPath := os.Args[2] configPath := os.Args[2]
server.BranchName = branchName server.BranchName = branchName
server.BranchNameToAddress = make(map[string]string) server.BranchNameToAddress = make(map[string]string)
server.Accounts = make(map[string]*Account) server.Accounts = make(map[string]*Account)
server.Clients = make(map[string]net.Conn)
server.Peers = make(map[string]net.Conn) server.Peers = make(map[string]net.Conn)
server.ClientChan = make(chan Request, 1000)
server.read_file(configPath) server.read_file(configPath)
fmt.Println("Server State:", server.BranchName, server.Address, server.BranchNameToAddress, server.Accounts, server.Peers) fmt.Println("Server State:", server.BranchName, server.Address, server.BranchNameToAddress, server.Accounts, server.Peers)
go server.listen_to_clients() go server.run_coordinator()
go server.connect_to_clients()
// go server.connect_to_peers() // go server.connect_to_peers()
for { for {
continue continue
} }
// timstamped concurrency
// init RTS and TW lists (ordered by timestamp)
//=\
// Strict Timestamp Ordering
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment