Skip to content
Snippets Groups Projects
Commit e3b81e98 authored by owenw2's avatar owenw2
Browse files

completed peer-to-peer communication

parent 82a0a3e1
No related branches found
No related tags found
No related merge requests found
......@@ -35,7 +35,7 @@ type Response struct {
Message string
}
var NUM_SERVERS int = 5 // number of servers
var NUM_SERVERS int = 1 // number of servers
// fills client response channel
func (client *Client) run_server() {
......
......@@ -6,6 +6,7 @@ import (
"fmt"
"net"
"os"
"strconv"
"strings"
"sync"
"time"
......@@ -16,14 +17,16 @@ type Server struct {
Address string
BranchNameToAddress map[string]string
Accounts map[string]*Account
Clients map[string]net.Conn // connected clients
Peers map[string]net.Conn // connected peers
Lock sync.Mutex // lock for peer map
Clients map[string]net.Conn // connected clients [name, socket]
ClientEncs map[string]*gob.Encoder // connected client encoders
Peers map[string]net.Conn // connected peers [address, socket]
PeerEncs map[string]*gob.Encoder // connected peer encoders
Lock sync.Mutex // lock for client and peer map
PeerConnectedWG sync.WaitGroup // wait group for all peers to connect
ClientChan chan Request
// PeerChan chan PeerReque
PeerChan chan Request
// ClientListener net.Listener
// PeerListener net.Listener
TransactionTimestamps map[string]time.Time // timestamp for all the transactions
}
type WriteEntry struct {
......@@ -32,16 +35,20 @@ type WriteEntry struct {
}
type Account struct {
Name string
Balance int
CreatedBy string
RTS []string
Name string
Balance int
CreatedBy string
RTS []time.Time
TW map[string]*WriteEntry // tentative writes
WriteTimestampCommitted []time.Time
Committed bool
AccountLock sync.Mutex
}
type Request struct {
ClientName string
PeerName string
Transaction string // type of transaction
Branch string
Account string
......@@ -53,28 +60,52 @@ type Response struct {
Message string
}
// func (server *Server) run_server() {
// // request := <-server.ClientChan
// // fmt.Println("Got client message:", request.Transaction, request.Branch, request.Account, request.Amount)
// // enc := gob.NewEncoder(clientConn)
// // response := Response{Result: true, Message: "OK"}
// // enc.Encode(&response)
// }
var NUM_SERVERS int = 2 // number of servers
//////////////////////////////
// CLIENT FUNCTIONS
//////////////////////////////
// func (server *Server) handle_peer_messages(peerConn net.Conn) {
// request := Request{}
// dec := gob.NewDecoder(peerConn)
/*
* Does: receives and handles transaction, sends to other servers if needed
* additional logic for committing and aborting
* retrieves from client channel
* Spawns: none
*/
func (server *Server) run_coordinator() {
fmt.Println("Starting coordinator thread")
for {
request := <-server.ClientChan
fmt.Println("Got client message:", request.Transaction, request.Branch, request.Account, request.Amount)
// for {
// dec.Decode(&request)
// server.PeerCa <- request
// }
// }
// send transaction to correct server
if request.Branch == server.BranchName {
// handle transaction
} else {
// request another server to handle transaction
request.PeerName = server.BranchName
server.Lock.Lock()
address := server.BranchNameToAddress[request.Branch]
server.PeerEncs[address].Encode(&request)
server.Lock.Unlock()
}
// to send back to client?
enc := gob.NewEncoder(server.Clients[request.ClientName])
response := Response{Result: true, Message: "OK"}
enc.Encode(&response)
}
}
/*
* Does: fills client channel
* Spawns: none
*/
func (server *Server) handle_client_messages(clientConn net.Conn) {
fmt.Println("Starting client handler thread")
dec := gob.NewDecoder(clientConn)
// initial decode used to remove
request := Request{}
dec.Decode(&request)
// add to map
......@@ -89,31 +120,82 @@ func (server *Server) handle_client_messages(clientConn net.Conn) {
fmt.Println("Decode error:", err)
return
}
// fmt.Println("Placing", request.Transaction, "from", request.ClientName)
server.ClientChan <- request
}
}
// function that represents coordinator thread
func (server *Server) run_coordinator() {
fmt.Println("Starting coordinator server")
for {
request := <-server.ClientChan
fmt.Println("Got client message:", request.Transaction, request.Branch, request.Account, request.Amount)
enc := gob.NewEncoder(server.Clients[request.ClientName])
// listens for client connection and launches thread to handle client requests
func (server *Server) listen_to_clients() {
ip := strings.Split(server.Address, ":")[0]
port, _ := strconv.Atoi(strings.Split(server.Address, ":")[1])
address := ip + ":" + strconv.Itoa(port+10)
input, err := net.Listen("tcp", address)
if err != nil {
fmt.Println("Listening on server failed")
}
defer input.Close()
for {
connection, err := input.Accept()
if err != nil {
fmt.Println("Error accepting client connection", err)
continue
}
// confirm connection with client
response := Response{Result: true, Message: "OK"}
enc := gob.NewEncoder(connection)
enc.Encode(&response)
// starts channel fill thread
go server.handle_client_messages(connection)
}
}
// listens for client connection and launches thread to handle client requests
func (server *Server) connect_to_clients() {
//////////////////////////////
// SERVER FUNCTIONS
//////////////////////////////
/*
* Does: receives and handles sent transaction
* Spawns: none
*/
func (server *Server) run_server() {
fmt.Println("Starting peer thread")
for {
request := <-server.PeerChan
fmt.Println("Got peer message:", request.Transaction, request.Branch, request.Account, request.Amount)
// go server.handle_transaction(request)
}
}
/*
* Does: fills peer channel
* Spawns: none
*/
func (server *Server) handle_peer_messages(peerConn net.Conn) {
fmt.Println("Starting peer handler thread")
dec := gob.NewDecoder(peerConn)
for {
request := Request{}
err := dec.Decode(&request)
if err != nil {
fmt.Println("Decode error:", err)
return
}
server.PeerChan <- request
}
}
/*
* Does: listens for tcp request from peers and accepts connection
* Spawns: multiple threads for each peer connection that fills peer channel
*/
func (server *Server) listen_to_peers() {
input, err := net.Listen("tcp", server.Address)
if err != nil {
fmt.Println("Listening on server failed")
}
defer input.Close()
for {
connection, err := input.Accept()
......@@ -121,18 +203,19 @@ func (server *Server) connect_to_clients() {
fmt.Println("Error accepting client connection", err)
continue
}
// confirm connection with client
response := Response{Result: true, Message: "OK"}
enc := gob.NewEncoder(connection)
enc.Encode(&response)
// starts channel fill thread
go server.handle_client_messages(connection)
go server.handle_peer_messages(connection)
}
}
// lsiten to tcp from other servers and handle necessary transactions
/*
* Does: Sends tcp request to all other servers in peer list
* Spawns: Nothing
*/
func (server *Server) connect_to_peers() {
server.PeerConnectedWG.Add(NUM_SERVERS - 1)
for address := range server.Peers {
go func(peerAddr string) {
var conn net.Conn
......@@ -142,17 +225,23 @@ func (server *Server) connect_to_peers() {
if err == nil {
break
}
time.Sleep(2 * time.Second)
time.Sleep(1 * time.Second)
}
server.Lock.Lock()
server.Peers[peerAddr] = conn
server.PeerEncs[address] = gob.NewEncoder(conn)
server.PeerConnectedWG.Done()
server.Lock.Unlock()
fmt.Println("Connected to peer", peerAddr)
// Handle peer communication here if needed
}(address)
}
server.PeerConnectedWG.Wait()
fmt.Println("Server Peers:", server.Peers)
}
//////////////////////////////
// HELPER FUNCTIONS
//////////////////////////////
func (server *Server) read_file(config_name string) {
config, error := os.Open(config_name)
if error != nil {
......@@ -163,9 +252,9 @@ func (server *Server) read_file(config_name string) {
reader := bufio.NewReader(config)
// assumption that only ever have 5 servers (no more, no less)
for i := 0; i < 1; i++ {
for i := 0; i < NUM_SERVERS; i++ {
text, serr := reader.ReadString('\n')
if serr != nil && i < 0 {
if serr != nil && i < NUM_SERVERS-1 {
fmt.Println("failed to read config", serr)
}
substrings := strings.Split(text, " ")
......@@ -185,6 +274,24 @@ func (server *Server) read_file(config_name string) {
config.Close()
}
func (server *Server) print_all_balances() {
server.Lock.Lock()
for _, account := range server.Accounts {
if account.Committed && account.Balance > 0 {
fmt.Println(server.BranchName+"."+account.Name, "=", account.Balance)
}
}
server.Lock.Unlock()
}
//////////////////////////////
// TIMESTAMP ORDERING AND TRANSACTION HANDLING START
//////////////////////////////
//////////////////////////////
// START OF PROGRAM
//////////////////////////////
func main() {
// init server
server := Server{}
......@@ -196,17 +303,19 @@ func main() {
server.BranchNameToAddress = make(map[string]string)
server.Accounts = make(map[string]*Account)
server.Clients = make(map[string]net.Conn)
server.ClientEncs = make(map[string]*gob.Encoder)
server.Peers = make(map[string]net.Conn)
server.PeerEncs = make(map[string]*gob.Encoder)
server.ClientChan = make(chan Request, 1000)
server.PeerChan = make(chan Request, 1000)
server.read_file(configPath)
fmt.Println("Server State:", server.BranchName, server.Address, server.BranchNameToAddress, server.Accounts, server.Peers)
go server.run_coordinator()
go server.connect_to_clients()
// go server.connect_to_peers()
go server.listen_to_clients()
for {
continue
}
go server.run_server()
go server.listen_to_peers() // potential race condition with connect
go server.connect_to_peers()
select {}
}
File deleted
A 127.0.0.1 2005
B 127.0.0.1 2006
C 127.0.0.1 2007
D 127.0.0.1 2008
E 127.0.0.1 2009
\ No newline at end of file
B 127.0.0.1 2006
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment