diff --git a/mp3/Client/client.go b/mp3/Client/client.go index 564327a578d1bd3954f8704add5ecdb2ef18e57f..3fdeb442a57035c38363a44033d4a5e07886b3f8 100644 --- a/mp3/Client/client.go +++ b/mp3/Client/client.go @@ -211,7 +211,7 @@ func main() { if response.Result { fmt.Println(response.Message) } else { - fmt.Println("NOT FOUND, ABORTED") + fmt.Println(response.Message) return } } else if substrings[0] == "WITHDRAW" && client.Connected { @@ -221,7 +221,7 @@ func main() { if response.Result { fmt.Println("OK") } else { - fmt.Println("NOT FOUND, ABORTED") + fmt.Println(response.Message) return } } else if substrings[0] == "PRINT" { diff --git a/mp3/Server/server.go b/mp3/Server/server.go index ae684bff956c385c67cbb91bf5a649d42fa3b512..7b205724e6b43d370d94153a5ef4a5faf8aaebd8 100644 --- a/mp3/Server/server.go +++ b/mp3/Server/server.go @@ -19,7 +19,6 @@ type Server struct { BranchNameToAddress map[string]string Accounts map[string]*Account Clients map[string]net.Conn // connected clients [name, socket] - ClientEncs map[string]*gob.Encoder // connected client encoders Peers map[string]net.Conn // connected peers [address, socket] PeerEncs map[string]*gob.Encoder // connected peer encoders [address, encoder] Lock sync.Mutex // lock for client and peer map @@ -28,10 +27,7 @@ type Server struct { PeerChan chan Request ClientsAffected map[string][]string // Track multiple affected accounts for each client VoteClientChans map[string]chan string // Vote channel for each transaction - - TransactionStatus map[string]string // Track the phase of each transaction for each client (for 2pc) - VoteResults map[string]bool // Track vote results for each client and server - + ClientCount int } type WriteEntry struct { @@ -43,6 +39,7 @@ type Account struct { Name string Balance int CreatedBy string + CreatedAt time.Time RTS []time.Time TW map[string]*WriteEntry // tentative writes WriteTimestampCommitted []time.Time @@ -80,7 +77,7 @@ var NUM_SERVERS int = 2 // number of servers func (server *Server) run_coordinator() { for { request := <-server.ClientChan - fmt.Println("Got client message:", request.Transaction, request.Branch, request.Account, request.Amount) + // fmt.Println("Got client message:", request.Transaction, request.Branch, request.Account, request.Amount) // Track the affected account for the client if request.Transaction == "BALANCE" || request.Transaction == "DEPOSIT" || request.Transaction == "WITHDRAW" { @@ -98,9 +95,23 @@ func (server *Server) run_coordinator() { go server.run_commit(request.ClientName) // start commit server go server.prepare(request) // start prepare phase } else if request.Transaction == "ABORT" { + var wg sync.WaitGroup + wg.Add(2) // We're launching two goroutines request.PeerName = server.BranchName - go server.prepare(request) // start prepare phase - go server.send_response_to_client(request.ClientName, false, "ABORT") + go func() { + defer wg.Done() + server.prepare(request) + }() + go func() { + defer wg.Done() + server.send_response_to_client(request.ClientName, false, "ABORT") + }() + wg.Wait() + server.Lock.Lock() + delete(server.ClientsAffected, request.ClientName) + delete(server.VoteClientChans, request.ClientName) + delete(server.Clients, request.ClientName) + server.Lock.Unlock() } else { // request another server to handle transaction request.PeerName = server.BranchName @@ -156,7 +167,10 @@ func (server *Server) listen_to_clients() { continue } // confirm connection with client - response := Response{Result: true, Message: "OK"} + server.Lock.Lock() + server.ClientCount += 1 + response := Response{Result: true, Message: strconv.Itoa(server.ClientCount)} + server.Lock.Unlock() enc := gob.NewEncoder(connection) enc.Encode(&response) // starts channel fill thread @@ -175,7 +189,7 @@ func (server *Server) listen_to_clients() { func (server *Server) run_server() { for { request := <-server.PeerChan - fmt.Println("Got peer message:", request.Transaction, request.Branch, request.Account, request.Amount) + // fmt.Println("Got peer message:", request.Transaction, request.Branch, request.Account, request.Amount) go server.handle_transaction(request) } @@ -260,10 +274,14 @@ func (server *Server) run_commit(clientName string) { for { vote := <-server.VoteClientChans[clientName] // get vote - // send abort if vote == "ABORT" { server.send_response_to_client(clientName, false, "ABORT") + server.Lock.Lock() + delete(server.ClientsAffected, clientName) + delete(server.VoteClientChans, clientName) + delete(server.Clients, clientName) + server.Lock.Unlock() return } else { voteCount += 1 @@ -277,40 +295,46 @@ func (server *Server) run_commit(clientName string) { server.Lock.Unlock() // Send PREPARE to peers and wait for responses + var wg sync.WaitGroup for _, account := range affectedAccounts { branch := strings.Split(account, ".")[0] - account := strings.Split(account, ".")[1] + accountName := strings.Split(account, ".")[1] + wg.Add(1) + + prepareRequest := Request{ + ClientName: clientName, + PeerName: server.BranchName, + TimeStamp: time.Time{}, + Transaction: "HANDLECOMMIT", + Branch: branch, + Account: accountName, + Amount: 0, + } + if branch != server.BranchName { peerAddr := server.BranchNameToAddress[branch] peerEnc := server.PeerEncs[peerAddr] - - prepareRequest := Request{ - ClientName: clientName, - PeerName: server.BranchName, - TimeStamp: time.Time{}, - Transaction: "HANDLECOMMIT", - Branch: branch, - Account: account, - Amount: 0, - } - - // Send PREPARE message to peer server - peerEnc.Encode(&prepareRequest) + go func() { + peerEnc.Encode(&prepareRequest) + wg.Done() + }() } else { - prepareRequest := Request{ - ClientName: clientName, - PeerName: server.BranchName, - TimeStamp: time.Time{}, - Transaction: "HANDLECOMMIT", - Branch: branch, - Account: account, - Amount: 0, - } - - server.handle_transaction(prepareRequest) + go func() { + server.handle_transaction(prepareRequest) + wg.Done() + }() } } + wg.Wait() // Wait for all commits to finish server.send_response_to_client(clientName, true, "COMMIT") + server.print_all_balances() + + // server cleanup + server.Lock.Lock() + delete(server.ClientsAffected, clientName) + delete(server.VoteClientChans, clientName) + delete(server.Clients, clientName) + server.Lock.Unlock() } } } @@ -320,6 +344,9 @@ func (server *Server) run_commit(clientName string) { */ func (server *Server) prepare(request Request) { clientName := request.ClientName + if request.Transaction == "COMMIT" { + request.Transaction = "PREPARE" + } // Get all affected accounts for this transaction server.Lock.Lock() @@ -340,7 +367,7 @@ func (server *Server) prepare(request Request) { ClientName: clientName, PeerName: server.BranchName, TimeStamp: request.TimeStamp, - Transaction: "PREPARE", + Transaction: request.Transaction, Branch: branch, Account: account, Amount: request.Amount, @@ -353,7 +380,7 @@ func (server *Server) prepare(request Request) { ClientName: clientName, PeerName: server.BranchName, TimeStamp: request.TimeStamp, - Transaction: "PREPARE", + Transaction: request.Transaction, Branch: branch, Account: account, Amount: request.Amount, @@ -400,6 +427,7 @@ func (server *Server) read_file(config_name string) { } func (server *Server) print_all_balances() { + // fmt.Println("There are", len(server.Accounts), "account") server.Lock.Lock() for _, account := range server.Accounts { account.AccountLock.Lock() @@ -417,7 +445,6 @@ func (server *Server) print_all_balances() { func (server *Server) send_response_to_client(clientName string, result bool, message string) { // fmt.Println("SENDING RESPONSE TO CLIENT", clientName, result, message, server.Clients[clientName], server.Clients) - response := Response{Result: result, Message: message} enc := gob.NewEncoder(server.Clients[clientName]) enc.Encode(&response) @@ -428,15 +455,17 @@ func (server *Server) handle_read(request Request) { account, exists := server.Accounts[request.Account] server.Lock.Unlock() + // fmt.Println("CALLED HANDLE READ", account, exists) + if !exists && request.PeerName != server.BranchName { request.Transaction = "SEND" - request.Account = "ABORT" + request.Account = "NOT FOUND, ABORTED" request.Amount = 0 server.PeerEncs[server.BranchNameToAddress[request.PeerName]].Encode(&request) return } else if !exists && request.PeerName == server.BranchName { request.Transaction = "SEND" - request.Account = "ABORT" + request.Account = "NOT FOUND, ABORTED" request.Amount = 0 server.handle_transaction(request) return @@ -453,7 +482,7 @@ func (server *Server) handle_read(request Request) { // verify that request is greater than write timestamp if request.TimeStamp.Before(maxCommitTs) || request.TimeStamp.Equal(maxCommitTs) { request.Transaction = "SEND" - request.Account = "ABORT" + request.Account = "ABORTED" request.Amount = 0 if request.PeerName == server.BranchName { server.handle_transaction(request) @@ -529,12 +558,14 @@ func (server *Server) handle_read(request Request) { // Otherwise, wait and retry (spinlock variant) // fmt.Println("Waiting for tentative write to commit/abort...") + account.AccountLock.Unlock() + for { time.Sleep(100 * time.Millisecond) // avoid busy-waiting too aggressively account.AccountLock.Lock() - // Check if Ds is now committedserver.send_response_ + // Check if Ds is now committed isNowCommitted := false for _, ts := range account.WriteTimestampCommitted { if ts.Equal(ds) { @@ -561,7 +592,8 @@ func (server *Server) handle_read(request Request) { // Check if tentative write no longer exists (was aborted) stillExists := false for _, entry := range account.TW { - if entry.Timestamp.Equal(ds) { + fmt.Println("COMPARE", entry.Timestamp, ds) + if entry.Timestamp.Equal(ds) || ds.Equal(time.Time{}) { stillExists = true break } @@ -569,6 +601,8 @@ func (server *Server) handle_read(request Request) { account.AccountLock.Unlock() + // fmt.Println(stillExists) + if !stillExists { // Reapply read rule // fmt.Println("Tentative write gone (aborted), retrying read") @@ -588,6 +622,7 @@ func (server *Server) handle_write(request Request) { Name: request.Account, Balance: 0, CreatedBy: request.ClientName, + CreatedAt: request.TimeStamp, RTS: []time.Time{}, TW: make(map[string]*WriteEntry), WriteTimestampCommitted: []time.Time{}, @@ -597,7 +632,7 @@ func (server *Server) handle_write(request Request) { } else if request.Transaction == "WITHDRAW" { server.Lock.Unlock() request.Transaction = "SEND" - request.Account = "ABORT" + request.Account = "NOT FOUND, ABORTED" request.Amount = 0 if request.PeerName == server.BranchName { server.handle_transaction(request) @@ -652,7 +687,7 @@ func (server *Server) handle_write(request Request) { // fmt.Printf("Tentative write recorded for %s: %+v\n", request.Account, account.TW[request.ClientName]) } else { request.Transaction = "SEND" - request.Account = "ABORT" + request.Account = "ABORTED" request.Amount = 0 if request.PeerName == server.BranchName { server.handle_transaction(request) @@ -665,7 +700,6 @@ func (server *Server) handle_write(request Request) { func (server *Server) handle_commit(request Request) { server.Lock.Lock() account := server.Accounts[request.Account] - server.Lock.Unlock() account.AccountLock.Lock() @@ -678,13 +712,9 @@ func (server *Server) handle_commit(request Request) { // Remove tentative write delete(account.TW, request.ClientName) - // fmt.Printf("COMMIT: Applied write to %s.%s, new balance: %d\n", request.Branch, request.Account, account.Balance) - // fmt.Println("RTS", account.RTS) - // fmt.Println("TW", account.TW) - // fmt.Println("COMMITED", account.WriteTimestampCommitted) - account.AccountLock.Unlock() - server.print_all_balances() + + server.Lock.Unlock() } func (server *Server) handle_abort(request Request) { @@ -697,19 +727,13 @@ func (server *Server) handle_abort(request Request) { } account.AccountLock.Lock() - defer account.AccountLock.Unlock() - // remove from TW list - _, ok := account.TW[request.ClientName] - if ok { + // Remove tentative write + if _, ok := account.TW[request.ClientName]; ok { delete(account.TW, request.ClientName) - // fmt.Printf("ABORT: Removed tentative write for %s.%s at %s\n", request.Branch, request.Account, request.TimeStamp) } - // else { - // fmt.Println("ABORT: No tentative write found to abort") - // } - // remove from RTS + // Remove aborted timestamp from RTS newRTS := []time.Time{} for _, ts := range account.RTS { if !ts.Equal(request.TimeStamp) { @@ -717,6 +741,15 @@ func (server *Server) handle_abort(request Request) { } } account.RTS = newRTS + + if account.CreatedBy == request.ClientName && account.CreatedAt.Equal(request.TimeStamp) { + account.AccountLock.Unlock() + server.Lock.Lock() + delete(server.Accounts, request.Account) + server.Lock.Unlock() + } else { + account.AccountLock.Unlock() + } } func (server *Server) handle_prepare(request Request) { @@ -795,7 +828,7 @@ func (server *Server) handle_prepare(request Request) { // handles transaction request from coordinator func (server *Server) handle_transaction(request Request) { - fmt.Println("HANDLING TRANSACTION", request.Transaction, request.Branch+"."+request.Account) + // fmt.Println("HANDLING TRANSACTION", request.Transaction, request.Branch+"."+request.Account) switch request.Transaction { case "BALANCE": @@ -834,7 +867,6 @@ func main() { server.BranchNameToAddress = make(map[string]string) server.Accounts = make(map[string]*Account) server.Clients = make(map[string]net.Conn) - server.ClientEncs = make(map[string]*gob.Encoder) server.Peers = make(map[string]net.Conn) server.PeerEncs = make(map[string]*gob.Encoder) server.ClientChan = make(chan Request, 1000)