Skip to content
Snippets Groups Projects
Commit 7f05e84b authored by owenw2's avatar owenw2
Browse files

read write fix, working on commit abort

parent b834f9e1
No related branches found
No related tags found
No related merge requests found
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"net" "net"
"os" "os"
"slices"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
...@@ -25,6 +26,12 @@ type Server struct { ...@@ -25,6 +26,12 @@ type Server struct {
PeerConnectedWG sync.WaitGroup // wait group for all peers to connect PeerConnectedWG sync.WaitGroup // wait group for all peers to connect
ClientChan chan Request ClientChan chan Request
PeerChan chan Request PeerChan chan Request
ClientsAffected map[string][]string // Track multiple affected accounts for each client
VoteClientChans map[string]chan string // Vote channel for each transaction
TransactionStatus map[string]string // Track the phase of each transaction for each client (for 2pc)
VoteResults map[string]bool // Track vote results for each client and server
} }
type WriteEntry struct { type WriteEntry struct {
...@@ -57,7 +64,7 @@ type Response struct { ...@@ -57,7 +64,7 @@ type Response struct {
Message string Message string
} }
var NUM_SERVERS int = 1 // number of servers var NUM_SERVERS int = 2 // number of servers
////////////////////////////// //////////////////////////////
// CLIENT FUNCTIONS // CLIENT FUNCTIONS
...@@ -74,38 +81,25 @@ func (server *Server) run_coordinator() { ...@@ -74,38 +81,25 @@ func (server *Server) run_coordinator() {
request := <-server.ClientChan request := <-server.ClientChan
fmt.Println("Got client message:", request.Transaction, request.Branch, request.Account, request.Amount) fmt.Println("Got client message:", request.Transaction, request.Branch, request.Account, request.Amount)
// send transaction to correct server // Track the affected account for the client
if request.Transaction == "BALANCE" || request.Transaction == "DEPOSIT" || request.Transaction == "WITHDRAW" {
if !(slices.Contains(server.ClientsAffected[request.ClientName], request.Branch+"."+request.Account)) {
server.ClientsAffected[request.ClientName] = append(server.ClientsAffected[request.ClientName], request.Branch+"."+request.Account)
}
}
// Send transaction to correct server
if request.Branch == server.BranchName { if request.Branch == server.BranchName {
// handle transaction
request.PeerName = server.BranchName request.PeerName = server.BranchName
go server.handle_transaction(request) go server.handle_transaction(request)
} else if request.Transaction == "COMMIT" { } else if request.Transaction == "COMMIT" {
// loop through all accounts and attempt to commit request.PeerName = server.BranchName
for _, accountPtr := range server.Accounts { go server.run_commit(request.ClientName) // start commit server
commitRequest := Request{ go server.prepare(request) // start prepare phase
ClientName: request.ClientName,
PeerName: server.BranchName,
TimeStamp: request.TimeStamp, // time of client struct
Transaction: "COMMIT", // type of transaction
Branch: server.BranchName,
Account: accountPtr.Name,
Amount: 0,
}
go server.handle_commit(commitRequest)
}
} else if request.Transaction == "ABORT" { } else if request.Transaction == "ABORT" {
for _, accountPtr := range server.Accounts { request.PeerName = server.BranchName
abortRequest := Request{ go server.prepare(request) // start prepare phase
ClientName: request.ClientName, go server.send_response_to_client(request.ClientName, false, "ABORT")
PeerName: server.BranchName,
TimeStamp: request.TimeStamp, // time of client struct
Transaction: "ABORT", // type of transaction
Branch: server.BranchName,
Account: accountPtr.Name,
Amount: 0,
}
go server.handle_abort(abortRequest)
}
} else { } else {
// request another server to handle transaction // request another server to handle transaction
request.PeerName = server.BranchName request.PeerName = server.BranchName
...@@ -114,11 +108,6 @@ func (server *Server) run_coordinator() { ...@@ -114,11 +108,6 @@ func (server *Server) run_coordinator() {
server.PeerEncs[address].Encode(&request) server.PeerEncs[address].Encode(&request)
server.Lock.Unlock() server.Lock.Unlock()
} }
// to send back to client?
enc := gob.NewEncoder(server.Clients[request.ClientName])
response := Response{Result: true, Message: "OK"}
enc.Encode(&response)
} }
} }
...@@ -136,11 +125,14 @@ func (server *Server) handle_client_messages(clientConn net.Conn) { ...@@ -136,11 +125,14 @@ func (server *Server) handle_client_messages(clientConn net.Conn) {
// add to map // add to map
server.Lock.Lock() server.Lock.Lock()
server.Clients[request.ClientName] = clientConn server.Clients[request.ClientName] = clientConn
server.VoteClientChans[request.ClientName] = make(chan string)
fmt.Println("JUST ADDED CLIENT", server.Clients)
server.Lock.Unlock() server.Lock.Unlock()
for { for {
request := Request{} request := Request{}
err := dec.Decode(&request) err := dec.Decode(&request)
request.PeerName = server.BranchName
if err != nil { if err != nil {
fmt.Println("Decode error:", err) fmt.Println("Decode error:", err)
return return
...@@ -262,6 +254,132 @@ func (server *Server) connect_to_peers() { ...@@ -262,6 +254,132 @@ func (server *Server) connect_to_peers() {
fmt.Println("Server Peers:", server.Peers) fmt.Println("Server Peers:", server.Peers)
} }
//////////////////////////////
// COMMIT SERVER FUNCTIONS
//////////////////////////////
/*
* Does: receives and handles commit transactions (one per client)
* Spawns: none
*/
func (server *Server) run_commit(clientName string) {
fmt.Println("Starting commit handling thread")
voteCount := 0
requiredVoteCount := len(server.ClientsAffected[clientName])
fmt.Println("Required vote count is", requiredVoteCount)
fmt.Println("Updated Accounts", server.ClientsAffected[clientName])
for {
fmt.Println("WAITING FOR VOTE ON", clientName)
vote := <-server.VoteClientChans[clientName] // get vote
fmt.Println("Received vote", vote)
// send abort
if vote == "ABORT" {
fmt.Println("OH NO ABORT")
server.send_response_to_client(clientName, false, "ABORT")
return
} else {
voteCount += 1
}
// send commit
if voteCount == requiredVoteCount {
// commit send handle commit to all related account
server.Lock.Lock()
affectedAccounts := server.ClientsAffected[clientName]
server.Lock.Unlock()
// Send PREPARE to peers and wait for responses
for _, account := range affectedAccounts {
branch := strings.Split(account, ".")[0]
account := strings.Split(account, ".")[1]
if branch != server.BranchName {
peerAddr := server.BranchNameToAddress[branch]
peerEnc := server.PeerEncs[peerAddr]
prepareRequest := Request{
ClientName: clientName,
PeerName: server.BranchName,
TimeStamp: time.Time{},
Transaction: "HANDLECOMMIT",
Branch: branch,
Account: account,
Amount: 0,
}
// Send PREPARE message to peer server
peerEnc.Encode(&prepareRequest)
} else {
prepareRequest := Request{
ClientName: clientName,
PeerName: server.BranchName,
TimeStamp: time.Time{},
Transaction: "HANDLECOMMIT",
Branch: branch,
Account: account,
Amount: 0,
}
server.handle_transaction(prepareRequest)
}
}
}
}
}
/*
* One per client
*/
func (server *Server) prepare(request Request) {
clientName := request.ClientName
// Get all affected accounts for this transaction
server.Lock.Lock()
affectedAccounts := server.ClientsAffected[clientName]
server.Lock.Unlock()
// Send PREPARE to peers and wait for responses
for _, account := range affectedAccounts {
fmt.Println("AFFECTED ACCOUNT", account)
branch := strings.Split(account, ".")[0]
account := strings.Split(account, ".")[1]
// sending prepare to different branch
if branch != server.BranchName {
peerAddr := server.BranchNameToAddress[branch]
peerEnc := server.PeerEncs[peerAddr]
prepareRequest := Request{
ClientName: clientName,
PeerName: server.BranchName,
TimeStamp: request.TimeStamp,
Transaction: "PREPARE",
Branch: branch,
Account: account,
Amount: request.Amount,
}
// Send PREPARE message to peer server
peerEnc.Encode(&prepareRequest)
} else {
prepareRequest := Request{
ClientName: clientName,
PeerName: server.BranchName,
TimeStamp: request.TimeStamp,
Transaction: "PREPARE",
Branch: branch,
Account: account,
Amount: request.Amount,
}
server.handle_transaction(prepareRequest)
}
}
}
////////////////////////////// //////////////////////////////
// HELPER FUNCTIONS // HELPER FUNCTIONS
////////////////////////////// //////////////////////////////
...@@ -312,8 +430,13 @@ func (server *Server) print_all_balances() { ...@@ -312,8 +430,13 @@ func (server *Server) print_all_balances() {
// TIMESTAMP ORDERING AND TRANSACTION HANDLING START // TIMESTAMP ORDERING AND TRANSACTION HANDLING START
////////////////////////////// //////////////////////////////
func (server *Server) send_response_to_coordinator() { func (server *Server) send_response_to_client(clientName string, result bool, message string) {
fmt.Println("SENDING RESPONSE TO CLIENT", clientName, result, message, server.Clients[clientName], server.Clients)
response := Response{Result: result, Message: message}
enc := gob.NewEncoder(server.Clients[clientName])
enc.Encode(&response)
fmt.Println("sent")
} }
func (server *Server) handle_read(request Request) { func (server *Server) handle_read(request Request) {
...@@ -321,8 +444,18 @@ func (server *Server) handle_read(request Request) { ...@@ -321,8 +444,18 @@ func (server *Server) handle_read(request Request) {
account, exists := server.Accounts[request.Account] account, exists := server.Accounts[request.Account]
server.Lock.Unlock() server.Lock.Unlock()
if !exists { if !exists && request.PeerName != server.BranchName {
fmt.Println("ABORT! read on non-existent account") fmt.Println("ABORT! read on non-existent account")
request.Transaction = "SEND"
request.Account = "ABORT"
request.Amount = 0
server.PeerEncs[server.BranchNameToAddress[request.PeerName]].Encode(&request)
return
} else if !exists && request.PeerName == server.BranchName {
request.Transaction = "SEND"
request.Account = "ABORT"
request.Amount = 0
server.handle_transaction(request)
return return
} }
...@@ -337,6 +470,14 @@ func (server *Server) handle_read(request Request) { ...@@ -337,6 +470,14 @@ func (server *Server) handle_read(request Request) {
// verify that request is greater than write timestamp // verify that request is greater than write timestamp
if request.TimeStamp.Before(maxCommitTs) || request.TimeStamp.Equal(maxCommitTs) { if request.TimeStamp.Before(maxCommitTs) || request.TimeStamp.Equal(maxCommitTs) {
fmt.Println("ABORT! Read is before a committed write") fmt.Println("ABORT! Read is before a committed write")
request.Transaction = "SEND"
request.Account = "ABORT"
request.Amount = 0
if request.PeerName == server.BranchName {
server.handle_transaction(request)
} else {
server.PeerEncs[server.BranchNameToAddress[request.PeerName]].Encode(&request)
}
account.AccountLock.Unlock() account.AccountLock.Unlock()
return return
} }
...@@ -375,6 +516,14 @@ func (server *Server) handle_read(request Request) { ...@@ -375,6 +516,14 @@ func (server *Server) handle_read(request Request) {
if !alreadyRead { if !alreadyRead {
account.RTS = append(account.RTS, request.TimeStamp) account.RTS = append(account.RTS, request.TimeStamp)
} }
request.Transaction = "SEND"
request.Account = request.Branch + "." + request.Account + " = " + strconv.Itoa(value)
request.Amount = 1
if request.PeerName == server.BranchName {
server.handle_transaction(request)
} else {
server.PeerEncs[server.BranchNameToAddress[request.PeerName]].Encode(&request)
}
fmt.Printf("READ COMMITTED VALUE: %s.%s = %d\n", request.Branch, request.Account, value) fmt.Printf("READ COMMITTED VALUE: %s.%s = %d\n", request.Branch, request.Account, value)
account.AccountLock.Unlock() account.AccountLock.Unlock()
return return
...@@ -383,6 +532,14 @@ func (server *Server) handle_read(request Request) { ...@@ -383,6 +532,14 @@ func (server *Server) handle_read(request Request) {
// if Ds is tentative and written by this client, read it // if Ds is tentative and written by this client, read it
entry, writtenByCurrent := account.TW[request.ClientName] entry, writtenByCurrent := account.TW[request.ClientName]
if writtenByCurrent && entry.Timestamp.Equal(ds) { if writtenByCurrent && entry.Timestamp.Equal(ds) {
request.Transaction = "SEND"
request.Account = request.Branch + "." + request.Account + " = " + strconv.Itoa(value)
request.Amount = 1
if request.PeerName == server.BranchName {
server.handle_transaction(request)
} else {
server.PeerEncs[server.BranchNameToAddress[request.PeerName]].Encode(&request)
}
fmt.Printf("READ OWN TENTATIVE WRITE: %s.%s = %d\n", request.Branch, request.Account, entry.OperationValue) fmt.Printf("READ OWN TENTATIVE WRITE: %s.%s = %d\n", request.Branch, request.Account, entry.OperationValue)
account.AccountLock.Unlock() account.AccountLock.Unlock()
return return
...@@ -395,7 +552,7 @@ func (server *Server) handle_read(request Request) { ...@@ -395,7 +552,7 @@ func (server *Server) handle_read(request Request) {
account.AccountLock.Lock() account.AccountLock.Lock()
// Check if Ds is now committed // Check if Ds is now committedserver.send_response_
isNowCommitted := false isNowCommitted := false
for _, ts := range account.WriteTimestampCommitted { for _, ts := range account.WriteTimestampCommitted {
if ts.Equal(ds) { if ts.Equal(ds) {
...@@ -408,6 +565,14 @@ func (server *Server) handle_read(request Request) { ...@@ -408,6 +565,14 @@ func (server *Server) handle_read(request Request) {
account.RTS = append(account.RTS, request.TimeStamp) account.RTS = append(account.RTS, request.TimeStamp)
account.AccountLock.Unlock() account.AccountLock.Unlock()
fmt.Printf("AFTER WAIT, READ COMMITTED: %s.%s = %d\n", request.Branch, request.Account, value) fmt.Printf("AFTER WAIT, READ COMMITTED: %s.%s = %d\n", request.Branch, request.Account, value)
request.Transaction = "SEND"
request.Account = request.Branch + "." + request.Account + " = " + strconv.Itoa(value)
request.Amount = 1
if request.PeerName == server.BranchName {
server.handle_transaction(request)
} else {
server.PeerEncs[server.BranchNameToAddress[request.PeerName]].Encode(&request)
}
return return
} }
...@@ -449,7 +614,14 @@ func (server *Server) handle_write(request Request) { ...@@ -449,7 +614,14 @@ func (server *Server) handle_write(request Request) {
exists = true exists = true
} else if request.Transaction == "WITHDRAW" { } else if request.Transaction == "WITHDRAW" {
server.Lock.Unlock() server.Lock.Unlock()
fmt.Println("ABORT! Withdrawing from non-existent account") request.Transaction = "SEND"
request.Account = "ABORT"
request.Amount = 0
if request.PeerName == server.BranchName {
server.handle_transaction(request)
} else {
server.PeerEncs[server.BranchNameToAddress[request.PeerName]].Encode(&request)
}
return return
} }
} }
...@@ -486,50 +658,42 @@ func (server *Server) handle_write(request Request) { ...@@ -486,50 +658,42 @@ func (server *Server) handle_write(request Request) {
OperationValue: value, OperationValue: value,
} }
} }
fmt.Printf("Tentative write recorded for %s: %+v\n", request.Account, account.TW[request.ClientName])
request.Transaction = "SEND"
request.Account = ""
request.Amount = 1
if request.PeerName == server.BranchName {
fmt.Println(request.Branch, server.BranchName, "BRANCHES")
server.handle_transaction(request)
} else {
fmt.Println("KABOOM")
server.PeerEncs[server.BranchNameToAddress[request.PeerName]].Encode(&request)
}
fmt.Printf("Tentative write recorded for %s: %+v\n", request.Account, account.TW[request.ClientName])
} else { } else {
fmt.Println("OH NO")
request.Transaction = "SEND"
request.Account = "ABORT"
request.Amount = 0
if request.PeerName == server.BranchName {
server.handle_transaction(request)
} else {
server.PeerEncs[server.BranchNameToAddress[request.PeerName]].Encode(&request)
}
fmt.Println("ABORT! Write condition violated") fmt.Println("ABORT! Write condition violated")
} }
} }
func (server *Server) handle_commit(request Request) { func (server *Server) handle_commit(request Request) {
server.Lock.Lock() server.Lock.Lock()
account, exists := server.Accounts[request.Account] account := server.Accounts[request.Account]
server.Lock.Unlock() server.Lock.Unlock()
if !exists {
fmt.Println("COMMIT: Account does not exist", request.Account)
return
}
account.AccountLock.Lock() account.AccountLock.Lock()
entry, ok := account.TW[request.ClientName] fmt.Println("HERE 2", account.TW[request.ClientName])
if !ok {
fmt.Println("COMMIT: No tentative write found to commit")
account.AccountLock.Unlock()
return
}
// loop until all previous are committed entry := account.TW[request.ClientName]
for {
prevAllCommitted := true
for client, prevEntry := range account.TW {
if client != request.ClientName && prevEntry.Timestamp.Before(entry.Timestamp) {
prevAllCommitted = false
break
}
}
if prevAllCommitted {
break
} else {
account.AccountLock.Unlock()
time.Sleep(100 * time.Millisecond)
account.AccountLock.Lock()
}
}
// Apply write // Apply write
account.Balance += entry.OperationValue account.Balance += entry.OperationValue
...@@ -539,13 +703,17 @@ func (server *Server) handle_commit(request Request) { ...@@ -539,13 +703,17 @@ func (server *Server) handle_commit(request Request) {
delete(account.TW, request.ClientName) delete(account.TW, request.ClientName)
fmt.Printf("COMMIT: Applied write to %s.%s, new balance: %d\n", request.Branch, request.Account, account.Balance) fmt.Printf("COMMIT: Applied write to %s.%s, new balance: %d\n", request.Branch, request.Account, account.Balance)
account.AccountLock.Unlock() fmt.Println("RTS", account.RTS)
fmt.Println("TW", account.TW)
fmt.Println("COMMITED", account.WriteTimestampCommitted)
server.Lock.Lock() server.Lock.Lock()
for accountName, account := range server.Accounts { for accountName, account := range server.Accounts {
fmt.Println(server.BranchName+accountName, "=", account.Balance, account.RTS, account.TW, account.WriteTimestampCommitted) fmt.Println(server.BranchName+accountName, "=", account.Balance, account.RTS, account.TW, account.WriteTimestampCommitted)
} }
server.Lock.Unlock() server.Lock.Unlock()
account.AccountLock.Unlock()
} }
func (server *Server) handle_abort(request Request) { func (server *Server) handle_abort(request Request) {
...@@ -580,8 +748,55 @@ func (server *Server) handle_abort(request Request) { ...@@ -580,8 +748,55 @@ func (server *Server) handle_abort(request Request) {
account.RTS = newRTS account.RTS = newRTS
} }
func (server *Server) handle_prepare(request Request) {
// consistency check
fmt.Println("MADE IT HERE WITH REQUEST", request)
server.Lock.Lock()
account, exists := server.Accounts[request.Account]
server.Lock.Unlock()
if !exists {
server.VoteClientChans[request.ClientName] <- "ABORT"
return
}
account.AccountLock.Lock()
entry, ok := account.TW[request.ClientName]
if !ok {
server.VoteClientChans[request.ClientName] <- "ABORT"
account.AccountLock.Unlock()
return
}
// loop until all previous are committed
for {
prevAllCommitted := true
for client, prevEntry := range account.TW {
if client != request.ClientName && prevEntry.Timestamp.Before(entry.Timestamp) {
prevAllCommitted = false
break
}
}
if prevAllCommitted {
fmt.Println("PLACED COMMIT INTO CHANNEL", request.ClientName)
server.VoteClientChans[request.ClientName] <- "COMMIT"
break
} else {
account.AccountLock.Unlock()
time.Sleep(100 * time.Millisecond)
account.AccountLock.Lock()
}
}
account.AccountLock.Unlock()
}
// handles transaction request from coordinator // handles transaction request from coordinator
func (server *Server) handle_transaction(request Request) { func (server *Server) handle_transaction(request Request) {
fmt.Println("HANDLING TRANSACTION", request.Transaction, request.Branch+"."+request.Account)
switch request.Transaction { switch request.Transaction {
case "BALANCE": case "BALANCE":
server.handle_read(request) server.handle_read(request)
...@@ -591,6 +806,14 @@ func (server *Server) handle_transaction(request Request) { ...@@ -591,6 +806,14 @@ func (server *Server) handle_transaction(request Request) {
server.handle_commit(request) server.handle_commit(request)
case "ABORT": case "ABORT":
server.handle_abort(request) server.handle_abort(request)
case "PREPARE":
server.handle_prepare(request)
case "HANDLECOMMIT":
request.PeerName = server.BranchName
go server.handle_commit(request)
case "SEND":
fmt.Println("PEER", request.PeerName)
go server.send_response_to_client(request.ClientName, request.Amount == 1, request.Account)
} }
} }
...@@ -613,12 +836,16 @@ func main() { ...@@ -613,12 +836,16 @@ func main() {
server.PeerEncs = make(map[string]*gob.Encoder) server.PeerEncs = make(map[string]*gob.Encoder)
server.ClientChan = make(chan Request, 1000) server.ClientChan = make(chan Request, 1000)
server.PeerChan = make(chan Request, 1000) server.PeerChan = make(chan Request, 1000)
server.ClientsAffected = make(map[string][]string)
server.VoteClientChans = make(map[string]chan string)
server.read_file(configPath) server.read_file(configPath)
// start coordinator server
go server.run_coordinator() go server.run_coordinator()
go server.listen_to_clients() go server.listen_to_clients()
// start peer server
go server.run_server() go server.run_server()
go server.listen_to_peers() go server.listen_to_peers()
go server.connect_to_peers() go server.connect_to_peers()
......
A 127.0.0.1 2005 A 127.0.0.1 2005
\ No newline at end of file B 127.0.0.1 2006
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment