From 7f05e84bd057abf5ce8830ec9edcb7104264b0b5 Mon Sep 17 00:00:00 2001
From: owenw2 <owenw2@sp25-cs425-0601.cs.illinois.edu>
Date: Fri, 9 May 2025 19:28:04 -0500
Subject: [PATCH] read write fix, working on commit abort

---
 mp3/Server/server.go | 365 +++++++++++++++++++++++++++++++++++--------
 mp3/config_test.txt  |   3 +-
 2 files changed, 298 insertions(+), 70 deletions(-)

diff --git a/mp3/Server/server.go b/mp3/Server/server.go
index bf7affd..5a78a00 100644
--- a/mp3/Server/server.go
+++ b/mp3/Server/server.go
@@ -6,6 +6,7 @@ import (
 	"fmt"
 	"net"
 	"os"
+	"slices"
 	"strconv"
 	"strings"
 	"sync"
@@ -25,6 +26,12 @@ type Server struct {
 	PeerConnectedWG     sync.WaitGroup          // wait group for all peers to connect
 	ClientChan          chan Request
 	PeerChan            chan Request
+	ClientsAffected     map[string][]string    // Track multiple affected accounts for each client
+	VoteClientChans     map[string]chan string // Vote channel for each transaction
+
+	TransactionStatus map[string]string // Track the phase of each transaction for each client (for 2pc)
+	VoteResults       map[string]bool   // Track vote results for each client and server
+
 }
 
 type WriteEntry struct {
@@ -57,7 +64,7 @@ type Response struct {
 	Message string
 }
 
-var NUM_SERVERS int = 1 // number of servers
+var NUM_SERVERS int = 2 // number of servers
 
 //////////////////////////////
 // CLIENT FUNCTIONS
@@ -74,38 +81,25 @@ func (server *Server) run_coordinator() {
 		request := <-server.ClientChan
 		fmt.Println("Got client message:", request.Transaction, request.Branch, request.Account, request.Amount)
 
-		// send transaction to correct server
+		// Track the affected account for the client
+		if request.Transaction == "BALANCE" || request.Transaction == "DEPOSIT" || request.Transaction == "WITHDRAW" {
+			if !(slices.Contains(server.ClientsAffected[request.ClientName], request.Branch+"."+request.Account)) {
+				server.ClientsAffected[request.ClientName] = append(server.ClientsAffected[request.ClientName], request.Branch+"."+request.Account)
+			}
+		}
+
+		// Send transaction to correct server
 		if request.Branch == server.BranchName {
-			// handle transaction
 			request.PeerName = server.BranchName
 			go server.handle_transaction(request)
 		} else if request.Transaction == "COMMIT" {
-			// loop through all accounts and attempt to commit
-			for _, accountPtr := range server.Accounts {
-				commitRequest := Request{
-					ClientName:  request.ClientName,
-					PeerName:    server.BranchName,
-					TimeStamp:   request.TimeStamp, // time of client struct
-					Transaction: "COMMIT",          // type of transaction
-					Branch:      server.BranchName,
-					Account:     accountPtr.Name,
-					Amount:      0,
-				}
-				go server.handle_commit(commitRequest)
-			}
+			request.PeerName = server.BranchName
+			go server.run_commit(request.ClientName) // start commit server
+			go server.prepare(request)               // start prepare phase
 		} else if request.Transaction == "ABORT" {
-			for _, accountPtr := range server.Accounts {
-				abortRequest := Request{
-					ClientName:  request.ClientName,
-					PeerName:    server.BranchName,
-					TimeStamp:   request.TimeStamp, // time of client struct
-					Transaction: "ABORT",           // type of transaction
-					Branch:      server.BranchName,
-					Account:     accountPtr.Name,
-					Amount:      0,
-				}
-				go server.handle_abort(abortRequest)
-			}
+			request.PeerName = server.BranchName
+			go server.prepare(request) // start prepare phase
+			go server.send_response_to_client(request.ClientName, false, "ABORT")
 		} else {
 			// request another server to handle transaction
 			request.PeerName = server.BranchName
@@ -114,11 +108,6 @@ func (server *Server) run_coordinator() {
 			server.PeerEncs[address].Encode(&request)
 			server.Lock.Unlock()
 		}
-
-		// to send back to client?
-		enc := gob.NewEncoder(server.Clients[request.ClientName])
-		response := Response{Result: true, Message: "OK"}
-		enc.Encode(&response)
 	}
 }
 
@@ -136,11 +125,14 @@ func (server *Server) handle_client_messages(clientConn net.Conn) {
 	// add to map
 	server.Lock.Lock()
 	server.Clients[request.ClientName] = clientConn
+	server.VoteClientChans[request.ClientName] = make(chan string)
+	fmt.Println("JUST ADDED CLIENT", server.Clients)
 	server.Lock.Unlock()
 
 	for {
 		request := Request{}
 		err := dec.Decode(&request)
+		request.PeerName = server.BranchName
 		if err != nil {
 			fmt.Println("Decode error:", err)
 			return
@@ -262,6 +254,132 @@ func (server *Server) connect_to_peers() {
 	fmt.Println("Server Peers:", server.Peers)
 }
 
+//////////////////////////////
+// COMMIT SERVER FUNCTIONS
+//////////////////////////////
+
+/*
+ * Does: receives and handles commit transactions (one per client)
+ * Spawns: none
+ */
+func (server *Server) run_commit(clientName string) {
+	fmt.Println("Starting commit handling thread")
+
+	voteCount := 0
+	requiredVoteCount := len(server.ClientsAffected[clientName])
+
+	fmt.Println("Required vote count is", requiredVoteCount)
+	fmt.Println("Updated Accounts", server.ClientsAffected[clientName])
+
+	for {
+		fmt.Println("WAITING FOR VOTE ON", clientName)
+		vote := <-server.VoteClientChans[clientName] // get vote
+
+		fmt.Println("Received vote", vote)
+
+		// send abort
+		if vote == "ABORT" {
+			fmt.Println("OH NO ABORT")
+			server.send_response_to_client(clientName, false, "ABORT")
+			return
+		} else {
+			voteCount += 1
+		}
+
+		// send commit
+		if voteCount == requiredVoteCount {
+			// commit send handle commit to all related account
+			server.Lock.Lock()
+			affectedAccounts := server.ClientsAffected[clientName]
+			server.Lock.Unlock()
+
+			// Send PREPARE to peers and wait for responses
+			for _, account := range affectedAccounts {
+				branch := strings.Split(account, ".")[0]
+				account := strings.Split(account, ".")[1]
+				if branch != server.BranchName {
+					peerAddr := server.BranchNameToAddress[branch]
+					peerEnc := server.PeerEncs[peerAddr]
+
+					prepareRequest := Request{
+						ClientName:  clientName,
+						PeerName:    server.BranchName,
+						TimeStamp:   time.Time{},
+						Transaction: "HANDLECOMMIT",
+						Branch:      branch,
+						Account:     account,
+						Amount:      0,
+					}
+
+					// Send PREPARE message to peer server
+					peerEnc.Encode(&prepareRequest)
+				} else {
+					prepareRequest := Request{
+						ClientName:  clientName,
+						PeerName:    server.BranchName,
+						TimeStamp:   time.Time{},
+						Transaction: "HANDLECOMMIT",
+						Branch:      branch,
+						Account:     account,
+						Amount:      0,
+					}
+
+					server.handle_transaction(prepareRequest)
+				}
+			}
+		}
+	}
+}
+
+/*
+ * One per client
+ */
+func (server *Server) prepare(request Request) {
+	clientName := request.ClientName
+
+	// Get all affected accounts for this transaction
+	server.Lock.Lock()
+	affectedAccounts := server.ClientsAffected[clientName]
+	server.Lock.Unlock()
+
+	// Send PREPARE to peers and wait for responses
+	for _, account := range affectedAccounts {
+		fmt.Println("AFFECTED ACCOUNT", account)
+		branch := strings.Split(account, ".")[0]
+		account := strings.Split(account, ".")[1]
+
+		// sending prepare to different branch
+		if branch != server.BranchName {
+			peerAddr := server.BranchNameToAddress[branch]
+			peerEnc := server.PeerEncs[peerAddr]
+
+			prepareRequest := Request{
+				ClientName:  clientName,
+				PeerName:    server.BranchName,
+				TimeStamp:   request.TimeStamp,
+				Transaction: "PREPARE",
+				Branch:      branch,
+				Account:     account,
+				Amount:      request.Amount,
+			}
+
+			// Send PREPARE message to peer server
+			peerEnc.Encode(&prepareRequest)
+		} else {
+			prepareRequest := Request{
+				ClientName:  clientName,
+				PeerName:    server.BranchName,
+				TimeStamp:   request.TimeStamp,
+				Transaction: "PREPARE",
+				Branch:      branch,
+				Account:     account,
+				Amount:      request.Amount,
+			}
+			server.handle_transaction(prepareRequest)
+		}
+	}
+}
+
 //////////////////////////////
 // HELPER FUNCTIONS
 //////////////////////////////
@@ -312,8 +430,13 @@ func (server *Server) print_all_balances() {
 // TIMESTAMP ORDERING AND TRANSACTION HANDLING START
 //////////////////////////////
 
-func (server *Server) send_response_to_coordinator() {
+func (server *Server) send_response_to_client(clientName string, result bool, message string) {
+	fmt.Println("SENDING RESPONSE TO CLIENT", clientName, result, message, server.Clients[clientName], server.Clients)
 
+	response := Response{Result: result, Message: message}
+	enc := gob.NewEncoder(server.Clients[clientName])
+	enc.Encode(&response)
+	fmt.Println("sent")
 }
 
 func (server *Server) handle_read(request Request) {
@@ -321,8 +444,18 @@ func (server *Server) handle_read(request Request) {
 	account, exists := server.Accounts[request.Account]
 	server.Lock.Unlock()
 
-	if !exists {
+	if !exists && request.PeerName != server.BranchName {
 		fmt.Println("ABORT! read on non-existent account")
+		request.Transaction = "SEND"
+		request.Account = "ABORT"
+		request.Amount = 0
+		server.PeerEncs[server.BranchNameToAddress[request.PeerName]].Encode(&request)
+		return
+	} else if !exists && request.PeerName == server.BranchName {
+		request.Transaction = "SEND"
+		request.Account = "ABORT"
+		request.Amount = 0
+		server.handle_transaction(request)
 		return
 	}
 
@@ -337,6 +470,14 @@ func (server *Server) handle_read(request Request) {
 	// verify that request is greater than write timestamp
 	if request.TimeStamp.Before(maxCommitTs) || request.TimeStamp.Equal(maxCommitTs) {
 		fmt.Println("ABORT! Read is before a committed write")
+		request.Transaction = "SEND"
+		request.Account = "ABORT"
+		request.Amount = 0
+		if request.PeerName == server.BranchName {
+			server.handle_transaction(request)
+		} else {
+			server.PeerEncs[server.BranchNameToAddress[request.PeerName]].Encode(&request)
+		}
 		account.AccountLock.Unlock()
 		return
 	}
@@ -375,6 +516,14 @@ func (server *Server) handle_read(request Request) {
 		if !alreadyRead {
 			account.RTS = append(account.RTS, request.TimeStamp)
 		}
+		request.Transaction = "SEND"
+		request.Account = request.Branch + "." + request.Account + " = " + strconv.Itoa(value)
+		request.Amount = 1
+		if request.PeerName == server.BranchName {
+			server.handle_transaction(request)
+		} else {
+			server.PeerEncs[server.BranchNameToAddress[request.PeerName]].Encode(&request)
+		}
 		fmt.Printf("READ COMMITTED VALUE: %s.%s = %d\n", request.Branch, request.Account, value)
 		account.AccountLock.Unlock()
 		return
@@ -383,6 +532,14 @@ func (server *Server) handle_read(request Request) {
 	// if Ds is tentative and written by this client, read it
 	entry, writtenByCurrent := account.TW[request.ClientName]
 	if writtenByCurrent && entry.Timestamp.Equal(ds) {
+		request.Transaction = "SEND"
+		request.Account = request.Branch + "." + request.Account + " = " + strconv.Itoa(value)
+		request.Amount = 1
+		if request.PeerName == server.BranchName {
+			server.handle_transaction(request)
+		} else {
+			server.PeerEncs[server.BranchNameToAddress[request.PeerName]].Encode(&request)
+		}
 		fmt.Printf("READ OWN TENTATIVE WRITE: %s.%s = %d\n", request.Branch, request.Account, entry.OperationValue)
 		account.AccountLock.Unlock()
 		return
@@ -395,7 +552,7 @@ func (server *Server) handle_read(request Request) {
 
 		account.AccountLock.Lock()
 
-		// Check if Ds is now committed
+		// Check if Ds is now committedserver.send_response_
 		isNowCommitted := false
 		for _, ts := range account.WriteTimestampCommitted {
 			if ts.Equal(ds) {
@@ -408,6 +565,14 @@ func (server *Server) handle_read(request Request) {
 			account.RTS = append(account.RTS, request.TimeStamp)
 			account.AccountLock.Unlock()
 			fmt.Printf("AFTER WAIT, READ COMMITTED: %s.%s = %d\n", request.Branch, request.Account, value)
+			request.Transaction = "SEND"
+			request.Account = request.Branch + "." + request.Account + " = " + strconv.Itoa(value)
+			request.Amount = 1
+			if request.PeerName == server.BranchName {
+				server.handle_transaction(request)
+			} else {
+				server.PeerEncs[server.BranchNameToAddress[request.PeerName]].Encode(&request)
+			}
 			return
 		}
 
@@ -449,7 +614,14 @@ func (server *Server) handle_write(request Request) {
 			exists = true
 		} else if request.Transaction == "WITHDRAW" {
 			server.Lock.Unlock()
-			fmt.Println("ABORT! Withdrawing from non-existent account")
+			request.Transaction = "SEND"
+			request.Account = "ABORT"
+			request.Amount = 0
+			if request.PeerName == server.BranchName {
+				server.handle_transaction(request)
+			} else {
+				server.PeerEncs[server.BranchNameToAddress[request.PeerName]].Encode(&request)
+			}
 			return
 		}
 	}
@@ -486,50 +658,42 @@ func (server *Server) handle_write(request Request) {
 				OperationValue: value,
 			}
 		}
-		fmt.Printf("Tentative write recorded for %s: %+v\n", request.Account, account.TW[request.ClientName])
 
+		request.Transaction = "SEND"
+		request.Account = ""
+		request.Amount = 1
+		if request.PeerName == server.BranchName {
+			fmt.Println(request.Branch, server.BranchName, "BRANCHES")
+			server.handle_transaction(request)
+		} else {
+			fmt.Println("KABOOM")
+			server.PeerEncs[server.BranchNameToAddress[request.PeerName]].Encode(&request)
+		}
+		fmt.Printf("Tentative write recorded for %s: %+v\n", request.Account, account.TW[request.ClientName])
 	} else {
+		fmt.Println("OH NO")
+		request.Transaction = "SEND"
+		request.Account = "ABORT"
+		request.Amount = 0
+		if request.PeerName == server.BranchName {
+			server.handle_transaction(request)
+		} else {
+			server.PeerEncs[server.BranchNameToAddress[request.PeerName]].Encode(&request)
+		}
 		fmt.Println("ABORT! Write condition violated")
 	}
 }
 
 func (server *Server) handle_commit(request Request) {
 	server.Lock.Lock()
-	account, exists := server.Accounts[request.Account]
+	account := server.Accounts[request.Account]
 	server.Lock.Unlock()
 
-	if !exists {
-		fmt.Println("COMMIT: Account does not exist", request.Account)
-		return
-	}
-
 	account.AccountLock.Lock()
 
-	entry, ok := account.TW[request.ClientName]
-	if !ok {
-		fmt.Println("COMMIT: No tentative write found to commit")
-		account.AccountLock.Unlock()
-		return
-	}
+	fmt.Println("HERE 2", account.TW[request.ClientName])
 
-	// loop until all previous are committed
-	for {
-		prevAllCommitted := true
-		for client, prevEntry := range account.TW {
-			if client != request.ClientName && prevEntry.Timestamp.Before(entry.Timestamp) {
-				prevAllCommitted = false
-				break
-			}
-		}
-
-		if prevAllCommitted {
-			break
-		} else {
-			account.AccountLock.Unlock()
-			time.Sleep(100 * time.Millisecond)
-			account.AccountLock.Lock()
-		}
-	}
+	entry := account.TW[request.ClientName]
 
 	// Apply write
 	account.Balance += entry.OperationValue
@@ -539,13 +703,17 @@ func (server *Server) handle_commit(request Request) {
 	delete(account.TW, request.ClientName)
 
 	fmt.Printf("COMMIT: Applied write to %s.%s, new balance: %d\n", request.Branch, request.Account, account.Balance)
-	account.AccountLock.Unlock()
+	fmt.Println("RTS", account.RTS)
+	fmt.Println("TW", account.TW)
+	fmt.Println("COMMITED", account.WriteTimestampCommitted)
 
 	server.Lock.Lock()
 	for accountName, account := range server.Accounts {
 		fmt.Println(server.BranchName+accountName, "=", account.Balance, account.RTS, account.TW, account.WriteTimestampCommitted)
 	}
 	server.Lock.Unlock()
+	account.AccountLock.Unlock()
+
 }
 
 func (server *Server) handle_abort(request Request) {
@@ -580,8 +748,55 @@ func (server *Server) handle_abort(request Request) {
 	account.RTS = newRTS
 }
 
+func (server *Server) handle_prepare(request Request) {
+	// consistency check
+	fmt.Println("MADE IT HERE WITH REQUEST", request)
+
+	server.Lock.Lock()
+	account, exists := server.Accounts[request.Account]
+	server.Lock.Unlock()
+
+	if !exists {
+		server.VoteClientChans[request.ClientName] <- "ABORT"
+		return
+	}
+
+	account.AccountLock.Lock()
+
+	entry, ok := account.TW[request.ClientName]
+	if !ok {
+		server.VoteClientChans[request.ClientName] <- "ABORT"
+		account.AccountLock.Unlock()
+		return
+	}
+
+	// loop until all previous are committed
+	for {
+		prevAllCommitted := true
+		for client, prevEntry := range account.TW {
+			if client != request.ClientName && prevEntry.Timestamp.Before(entry.Timestamp) {
+				prevAllCommitted = false
+				break
+			}
+		}
+
+		if prevAllCommitted {
+			fmt.Println("PLACED COMMIT INTO CHANNEL", request.ClientName)
+			server.VoteClientChans[request.ClientName] <- "COMMIT"
+			break
+		} else {
+			account.AccountLock.Unlock()
+			time.Sleep(100 * time.Millisecond)
+			account.AccountLock.Lock()
+		}
+	}
+	account.AccountLock.Unlock()
+}
+
 // handles transaction request from coordinator
 func (server *Server) handle_transaction(request Request) {
+	fmt.Println("HANDLING TRANSACTION", request.Transaction, request.Branch+"."+request.Account)
+
 	switch request.Transaction {
 	case "BALANCE":
 		server.handle_read(request)
@@ -591,6 +806,14 @@ func (server *Server) handle_transaction(request Request) {
 		server.handle_commit(request)
 	case "ABORT":
 		server.handle_abort(request)
+	case "PREPARE":
+		server.handle_prepare(request)
+	case "HANDLECOMMIT":
+		request.PeerName = server.BranchName
+		go server.handle_commit(request)
+	case "SEND":
+		fmt.Println("PEER", request.PeerName)
+		go server.send_response_to_client(request.ClientName, request.Amount == 1, request.Account)
 	}
 }
 
@@ -613,12 +836,16 @@ func main() {
 	server.PeerEncs = make(map[string]*gob.Encoder)
 	server.ClientChan = make(chan Request, 1000)
 	server.PeerChan = make(chan Request, 1000)
+	server.ClientsAffected = make(map[string][]string)
+	server.VoteClientChans = make(map[string]chan string)
 
 	server.read_file(configPath)
 
+	// start coordinator server
 	go server.run_coordinator()
 	go server.listen_to_clients()
 
+	// start peer server
 	go server.run_server()
 	go server.listen_to_peers()
 	go server.connect_to_peers()
diff --git a/mp3/config_test.txt b/mp3/config_test.txt
index 6f63ebd..b940ff0 100644
--- a/mp3/config_test.txt
+++ b/mp3/config_test.txt
@@ -1 +1,2 @@
-A 127.0.0.1 2005
\ No newline at end of file
+A 127.0.0.1 2005
+B 127.0.0.1 2006
\ No newline at end of file
-- 
GitLab