Skip to content
Snippets Groups Projects
Commit 0624cae1 authored by owenw2's avatar owenw2
Browse files

added commit/abort cleanup, updated message propagation

parent 7ee4830f
No related branches found
No related tags found
No related merge requests found
......@@ -211,7 +211,7 @@ func main() {
if response.Result {
fmt.Println(response.Message)
} else {
fmt.Println("NOT FOUND, ABORTED")
fmt.Println(response.Message)
return
}
} else if substrings[0] == "WITHDRAW" && client.Connected {
......@@ -221,7 +221,7 @@ func main() {
if response.Result {
fmt.Println("OK")
} else {
fmt.Println("NOT FOUND, ABORTED")
fmt.Println(response.Message)
return
}
} else if substrings[0] == "PRINT" {
......
......@@ -19,7 +19,6 @@ type Server struct {
BranchNameToAddress map[string]string
Accounts map[string]*Account
Clients map[string]net.Conn // connected clients [name, socket]
ClientEncs map[string]*gob.Encoder // connected client encoders
Peers map[string]net.Conn // connected peers [address, socket]
PeerEncs map[string]*gob.Encoder // connected peer encoders [address, encoder]
Lock sync.Mutex // lock for client and peer map
......@@ -28,10 +27,7 @@ type Server struct {
PeerChan chan Request
ClientsAffected map[string][]string // Track multiple affected accounts for each client
VoteClientChans map[string]chan string // Vote channel for each transaction
TransactionStatus map[string]string // Track the phase of each transaction for each client (for 2pc)
VoteResults map[string]bool // Track vote results for each client and server
ClientCount int
}
type WriteEntry struct {
......@@ -43,6 +39,7 @@ type Account struct {
Name string
Balance int
CreatedBy string
CreatedAt time.Time
RTS []time.Time
TW map[string]*WriteEntry // tentative writes
WriteTimestampCommitted []time.Time
......@@ -80,7 +77,7 @@ var NUM_SERVERS int = 2 // number of servers
func (server *Server) run_coordinator() {
for {
request := <-server.ClientChan
fmt.Println("Got client message:", request.Transaction, request.Branch, request.Account, request.Amount)
// fmt.Println("Got client message:", request.Transaction, request.Branch, request.Account, request.Amount)
// Track the affected account for the client
if request.Transaction == "BALANCE" || request.Transaction == "DEPOSIT" || request.Transaction == "WITHDRAW" {
......@@ -98,9 +95,23 @@ func (server *Server) run_coordinator() {
go server.run_commit(request.ClientName) // start commit server
go server.prepare(request) // start prepare phase
} else if request.Transaction == "ABORT" {
var wg sync.WaitGroup
wg.Add(2) // We're launching two goroutines
request.PeerName = server.BranchName
go server.prepare(request) // start prepare phase
go server.send_response_to_client(request.ClientName, false, "ABORT")
go func() {
defer wg.Done()
server.prepare(request)
}()
go func() {
defer wg.Done()
server.send_response_to_client(request.ClientName, false, "ABORT")
}()
wg.Wait()
server.Lock.Lock()
delete(server.ClientsAffected, request.ClientName)
delete(server.VoteClientChans, request.ClientName)
delete(server.Clients, request.ClientName)
server.Lock.Unlock()
} else {
// request another server to handle transaction
request.PeerName = server.BranchName
......@@ -156,7 +167,10 @@ func (server *Server) listen_to_clients() {
continue
}
// confirm connection with client
response := Response{Result: true, Message: "OK"}
server.Lock.Lock()
server.ClientCount += 1
response := Response{Result: true, Message: strconv.Itoa(server.ClientCount)}
server.Lock.Unlock()
enc := gob.NewEncoder(connection)
enc.Encode(&response)
// starts channel fill thread
......@@ -175,7 +189,7 @@ func (server *Server) listen_to_clients() {
func (server *Server) run_server() {
for {
request := <-server.PeerChan
fmt.Println("Got peer message:", request.Transaction, request.Branch, request.Account, request.Amount)
// fmt.Println("Got peer message:", request.Transaction, request.Branch, request.Account, request.Amount)
go server.handle_transaction(request)
}
......@@ -260,10 +274,14 @@ func (server *Server) run_commit(clientName string) {
for {
vote := <-server.VoteClientChans[clientName] // get vote
// send abort
if vote == "ABORT" {
server.send_response_to_client(clientName, false, "ABORT")
server.Lock.Lock()
delete(server.ClientsAffected, clientName)
delete(server.VoteClientChans, clientName)
delete(server.Clients, clientName)
server.Lock.Unlock()
return
} else {
voteCount += 1
......@@ -277,40 +295,46 @@ func (server *Server) run_commit(clientName string) {
server.Lock.Unlock()
// Send PREPARE to peers and wait for responses
var wg sync.WaitGroup
for _, account := range affectedAccounts {
branch := strings.Split(account, ".")[0]
account := strings.Split(account, ".")[1]
accountName := strings.Split(account, ".")[1]
wg.Add(1)
prepareRequest := Request{
ClientName: clientName,
PeerName: server.BranchName,
TimeStamp: time.Time{},
Transaction: "HANDLECOMMIT",
Branch: branch,
Account: accountName,
Amount: 0,
}
if branch != server.BranchName {
peerAddr := server.BranchNameToAddress[branch]
peerEnc := server.PeerEncs[peerAddr]
prepareRequest := Request{
ClientName: clientName,
PeerName: server.BranchName,
TimeStamp: time.Time{},
Transaction: "HANDLECOMMIT",
Branch: branch,
Account: account,
Amount: 0,
}
// Send PREPARE message to peer server
peerEnc.Encode(&prepareRequest)
go func() {
peerEnc.Encode(&prepareRequest)
wg.Done()
}()
} else {
prepareRequest := Request{
ClientName: clientName,
PeerName: server.BranchName,
TimeStamp: time.Time{},
Transaction: "HANDLECOMMIT",
Branch: branch,
Account: account,
Amount: 0,
}
server.handle_transaction(prepareRequest)
go func() {
server.handle_transaction(prepareRequest)
wg.Done()
}()
}
}
wg.Wait() // Wait for all commits to finish
server.send_response_to_client(clientName, true, "COMMIT")
server.print_all_balances()
// server cleanup
server.Lock.Lock()
delete(server.ClientsAffected, clientName)
delete(server.VoteClientChans, clientName)
delete(server.Clients, clientName)
server.Lock.Unlock()
}
}
}
......@@ -320,6 +344,9 @@ func (server *Server) run_commit(clientName string) {
*/
func (server *Server) prepare(request Request) {
clientName := request.ClientName
if request.Transaction == "COMMIT" {
request.Transaction = "PREPARE"
}
// Get all affected accounts for this transaction
server.Lock.Lock()
......@@ -340,7 +367,7 @@ func (server *Server) prepare(request Request) {
ClientName: clientName,
PeerName: server.BranchName,
TimeStamp: request.TimeStamp,
Transaction: "PREPARE",
Transaction: request.Transaction,
Branch: branch,
Account: account,
Amount: request.Amount,
......@@ -353,7 +380,7 @@ func (server *Server) prepare(request Request) {
ClientName: clientName,
PeerName: server.BranchName,
TimeStamp: request.TimeStamp,
Transaction: "PREPARE",
Transaction: request.Transaction,
Branch: branch,
Account: account,
Amount: request.Amount,
......@@ -400,6 +427,7 @@ func (server *Server) read_file(config_name string) {
}
func (server *Server) print_all_balances() {
// fmt.Println("There are", len(server.Accounts), "account")
server.Lock.Lock()
for _, account := range server.Accounts {
account.AccountLock.Lock()
......@@ -417,7 +445,6 @@ func (server *Server) print_all_balances() {
func (server *Server) send_response_to_client(clientName string, result bool, message string) {
// fmt.Println("SENDING RESPONSE TO CLIENT", clientName, result, message, server.Clients[clientName], server.Clients)
response := Response{Result: result, Message: message}
enc := gob.NewEncoder(server.Clients[clientName])
enc.Encode(&response)
......@@ -428,15 +455,17 @@ func (server *Server) handle_read(request Request) {
account, exists := server.Accounts[request.Account]
server.Lock.Unlock()
// fmt.Println("CALLED HANDLE READ", account, exists)
if !exists && request.PeerName != server.BranchName {
request.Transaction = "SEND"
request.Account = "ABORT"
request.Account = "NOT FOUND, ABORTED"
request.Amount = 0
server.PeerEncs[server.BranchNameToAddress[request.PeerName]].Encode(&request)
return
} else if !exists && request.PeerName == server.BranchName {
request.Transaction = "SEND"
request.Account = "ABORT"
request.Account = "NOT FOUND, ABORTED"
request.Amount = 0
server.handle_transaction(request)
return
......@@ -453,7 +482,7 @@ func (server *Server) handle_read(request Request) {
// verify that request is greater than write timestamp
if request.TimeStamp.Before(maxCommitTs) || request.TimeStamp.Equal(maxCommitTs) {
request.Transaction = "SEND"
request.Account = "ABORT"
request.Account = "ABORTED"
request.Amount = 0
if request.PeerName == server.BranchName {
server.handle_transaction(request)
......@@ -529,12 +558,14 @@ func (server *Server) handle_read(request Request) {
// Otherwise, wait and retry (spinlock variant)
// fmt.Println("Waiting for tentative write to commit/abort...")
account.AccountLock.Unlock()
for {
time.Sleep(100 * time.Millisecond) // avoid busy-waiting too aggressively
account.AccountLock.Lock()
// Check if Ds is now committedserver.send_response_
// Check if Ds is now committed
isNowCommitted := false
for _, ts := range account.WriteTimestampCommitted {
if ts.Equal(ds) {
......@@ -561,7 +592,8 @@ func (server *Server) handle_read(request Request) {
// Check if tentative write no longer exists (was aborted)
stillExists := false
for _, entry := range account.TW {
if entry.Timestamp.Equal(ds) {
fmt.Println("COMPARE", entry.Timestamp, ds)
if entry.Timestamp.Equal(ds) || ds.Equal(time.Time{}) {
stillExists = true
break
}
......@@ -569,6 +601,8 @@ func (server *Server) handle_read(request Request) {
account.AccountLock.Unlock()
// fmt.Println(stillExists)
if !stillExists {
// Reapply read rule
// fmt.Println("Tentative write gone (aborted), retrying read")
......@@ -588,6 +622,7 @@ func (server *Server) handle_write(request Request) {
Name: request.Account,
Balance: 0,
CreatedBy: request.ClientName,
CreatedAt: request.TimeStamp,
RTS: []time.Time{},
TW: make(map[string]*WriteEntry),
WriteTimestampCommitted: []time.Time{},
......@@ -597,7 +632,7 @@ func (server *Server) handle_write(request Request) {
} else if request.Transaction == "WITHDRAW" {
server.Lock.Unlock()
request.Transaction = "SEND"
request.Account = "ABORT"
request.Account = "NOT FOUND, ABORTED"
request.Amount = 0
if request.PeerName == server.BranchName {
server.handle_transaction(request)
......@@ -652,7 +687,7 @@ func (server *Server) handle_write(request Request) {
// fmt.Printf("Tentative write recorded for %s: %+v\n", request.Account, account.TW[request.ClientName])
} else {
request.Transaction = "SEND"
request.Account = "ABORT"
request.Account = "ABORTED"
request.Amount = 0
if request.PeerName == server.BranchName {
server.handle_transaction(request)
......@@ -665,7 +700,6 @@ func (server *Server) handle_write(request Request) {
func (server *Server) handle_commit(request Request) {
server.Lock.Lock()
account := server.Accounts[request.Account]
server.Lock.Unlock()
account.AccountLock.Lock()
......@@ -678,13 +712,9 @@ func (server *Server) handle_commit(request Request) {
// Remove tentative write
delete(account.TW, request.ClientName)
// fmt.Printf("COMMIT: Applied write to %s.%s, new balance: %d\n", request.Branch, request.Account, account.Balance)
// fmt.Println("RTS", account.RTS)
// fmt.Println("TW", account.TW)
// fmt.Println("COMMITED", account.WriteTimestampCommitted)
account.AccountLock.Unlock()
server.print_all_balances()
server.Lock.Unlock()
}
func (server *Server) handle_abort(request Request) {
......@@ -697,19 +727,13 @@ func (server *Server) handle_abort(request Request) {
}
account.AccountLock.Lock()
defer account.AccountLock.Unlock()
// remove from TW list
_, ok := account.TW[request.ClientName]
if ok {
// Remove tentative write
if _, ok := account.TW[request.ClientName]; ok {
delete(account.TW, request.ClientName)
// fmt.Printf("ABORT: Removed tentative write for %s.%s at %s\n", request.Branch, request.Account, request.TimeStamp)
}
// else {
// fmt.Println("ABORT: No tentative write found to abort")
// }
// remove from RTS
// Remove aborted timestamp from RTS
newRTS := []time.Time{}
for _, ts := range account.RTS {
if !ts.Equal(request.TimeStamp) {
......@@ -717,6 +741,15 @@ func (server *Server) handle_abort(request Request) {
}
}
account.RTS = newRTS
if account.CreatedBy == request.ClientName && account.CreatedAt.Equal(request.TimeStamp) {
account.AccountLock.Unlock()
server.Lock.Lock()
delete(server.Accounts, request.Account)
server.Lock.Unlock()
} else {
account.AccountLock.Unlock()
}
}
func (server *Server) handle_prepare(request Request) {
......@@ -795,7 +828,7 @@ func (server *Server) handle_prepare(request Request) {
// handles transaction request from coordinator
func (server *Server) handle_transaction(request Request) {
fmt.Println("HANDLING TRANSACTION", request.Transaction, request.Branch+"."+request.Account)
// fmt.Println("HANDLING TRANSACTION", request.Transaction, request.Branch+"."+request.Account)
switch request.Transaction {
case "BALANCE":
......@@ -834,7 +867,6 @@ func main() {
server.BranchNameToAddress = make(map[string]string)
server.Accounts = make(map[string]*Account)
server.Clients = make(map[string]net.Conn)
server.ClientEncs = make(map[string]*gob.Encoder)
server.Peers = make(map[string]net.Conn)
server.PeerEncs = make(map[string]*gob.Encoder)
server.ClientChan = make(chan Request, 1000)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment