From 1f6f9acf6823b1d826f2e2ea9012514c46f40710 Mon Sep 17 00:00:00 2001 From: Tingyin Ding <32661494+Tinameow@users.noreply.github.com> Date: Tue, 21 Nov 2023 22:28:01 -0600 Subject: [PATCH] Init repo with basic code structure. --- config/config.json | 5 + config/gossipConfig.json | 11 + election/election.go | 404 +++++++++++++ election/go.mod | 3 + go.mod | 13 + leader.txt | 1 + main.go | 5 + maplejuice/application_manager.go | 24 + maplejuice/go.mod | 3 + maplejuice/node_manager.go | 23 + maplejuice/resource_manager.go | 22 + maplejuice/types.go | 18 + membership.json | 1 + membership/FailedList.go | 51 ++ membership/MembershipList.go | 79 +++ membership/go.mod | 3 + membership/messageHandler.go | 389 ++++++++++++ membership/node.go | 410 +++++++++++++ membership/utils.go | 47 ++ sdfs/client.go | 486 +++++++++++++++ sdfs/datanode.go | 168 ++++++ sdfs/go.mod | 3 + sdfs/namenode.go | 966 ++++++++++++++++++++++++++++++ sdfs/server.go | 53 ++ sdfs/token.go | 80 +++ sdfs/types.go | 77 +++ utils/gen_scp_keys.sh | 27 + utils/go.mod | 3 + utils/update_go_mod_version.sh | 18 + utils/utils.go | 47 ++ 30 files changed, 3440 insertions(+) create mode 100644 config/config.json create mode 100644 config/gossipConfig.json create mode 100644 election/election.go create mode 100644 election/go.mod create mode 100644 go.mod create mode 100644 leader.txt create mode 100644 main.go create mode 100644 maplejuice/application_manager.go create mode 100644 maplejuice/go.mod create mode 100644 maplejuice/node_manager.go create mode 100644 maplejuice/resource_manager.go create mode 100644 maplejuice/types.go create mode 100644 membership.json create mode 100644 membership/FailedList.go create mode 100644 membership/MembershipList.go create mode 100644 membership/go.mod create mode 100644 membership/messageHandler.go create mode 100644 membership/node.go create mode 100644 membership/utils.go create mode 100644 sdfs/client.go create mode 100644 sdfs/datanode.go create mode 100644 sdfs/go.mod create mode 100644 sdfs/namenode.go create mode 100644 sdfs/server.go create mode 100644 sdfs/token.go create mode 100644 sdfs/types.go create mode 100755 utils/gen_scp_keys.sh create mode 100644 utils/go.mod create mode 100755 utils/update_go_mod_version.sh create mode 100644 utils/utils.go diff --git a/config/config.json b/config/config.json new file mode 100644 index 0000000..1d9bade --- /dev/null +++ b/config/config.json @@ -0,0 +1,5 @@ +{ + "datanode":5001, + "namenode": 5002, + "client":5003 +} \ No newline at end of file diff --git a/config/gossipConfig.json b/config/gossipConfig.json new file mode 100644 index 0000000..aae3189 --- /dev/null +++ b/config/gossipConfig.json @@ -0,0 +1,11 @@ +{ + "Introducor": "fa23-cs425-0201.cs.illinois.edu", + "Port": "8000", + "T_gossip": 0.8, + "Fanout": 3, + "T_fail": 3, + "T_cleanup": 3, + "T_suspicion": 2, + "P_ml": 0, + "Mode": "Gossip" +} diff --git a/election/election.go b/election/election.go new file mode 100644 index 0000000..14a336b --- /dev/null +++ b/election/election.go @@ -0,0 +1,404 @@ +/* election.go +Given a potential list of candidates, return the ID of the leader. +Implement the bully algorithm. +*/ + +package election + +import ( + "encoding/json" + "fmt" + "log" + "net" + "os" + "strings" + "sync" + "time" +) + +var ALLHOST = []string{"fa23-cs425-0201.cs.illinois.edu", "fa23-cs425-0202.cs.illinois.edu", "fa23-cs425-0203.cs.illinois.edu", "fa23-cs425-0204.cs.illinois.edu", "fa23-cs425-0205.cs.illinois.edu", "fa23-cs425-0206.cs.illinois.edu", "fa23-cs425-0207.cs.illinois.edu", "fa23-cs425-0208.cs.illinois.edu", "fa23-cs425-0209.cs.illinois.edu", "fa23-cs425-0210.cs.illinois.edu"} + +const PORT = "8080" + +const leaderfile = "leader.txt" + +type MsgType int + +const ( + COORDINATOR MsgType = iota + ELECTION + ANSWER +) + +type Message struct { + Sender string + Type MsgType +} + +type Election struct { + leader string // hostname + startNode string // hostname + candidates []string // [hostname] + timeout time.Duration // timeout in seconds + state bool // true if electing + answerReceived chan Message + ElectionReceived chan Message + onLeaderChange chan struct{} + mu sync.RWMutex + electionMu sync.Mutex +} + +func InitializeElection(startNode string, timeout int) *Election { + election := new(Election) + election.leader = "fa23-cs425-0201.cs.illinois.edu" + election.startNode = startNode + memberList, err := getMeberList("membership.json") + if err != nil { + election.candidates = ALLHOST + } else { + election.candidates = memberList + } + election.timeout = time.Duration(timeout) * time.Second + election.state = false + election.answerReceived = make(chan Message) + election.ElectionReceived = make(chan Message) + election.onLeaderChange = make(chan struct{}) + election.mu = sync.RWMutex{} + election.electionMu = sync.Mutex{} + return election +} + +func (e *Election) GetLeader() string { + e.mu.RLock() + defer e.mu.RUnlock() + return e.leader +} + +func (e *Election) SetLeader(leader string) { + e.mu.Lock() + defer e.mu.Unlock() + if e.leader != leader { + log.Println("ELECTION: Set leader", leader) + + e.leader = leader + // Convert the leader to bytes + leaderBytes := []byte(leader) + + // Write the leader to the file + err := os.WriteFile(leaderfile, leaderBytes, 0644) + if err != nil { + log.Println("ELECTION: Error writing to the file:", err) + return + } + } + log.Println("ELECTION: Leader doesn't change.") +} + +func (e *Election) Run(workfunc func()) { + go e.ElectionListener() + go func() { + for { + if (!e.state) && (e.GetLeader() == "") { + log.Println("ELECTION: No leader, re elect") + e.StartElection() + } + time.Sleep(1 * time.Second) + } + }() + + for { + select { + case <-e.onLeaderChange: + log.Println("ELECTION: Leader changed, work") + workfunc() + + case msg := <-e.ElectionReceived: + if (!e.state) && msg.Type == ELECTION { + log.Println("ELECTION: Received election msg, elect") + + e.StartElection() + } + + default: + // do nothing + } + } +} + +// func (e *Election) Run(workfunc func()) { +// go e.ElectionListener() +// // e.StartElection() +// // for msg := range e.ElectionReceived { +// // if msg.Type == ELECTION { +// // e.StartElection() +// // } +// // } +// for { +// select { +// case <-e.onLeaderChange: +// log.Println("ELECTION: leader changes...run workfunc()") +// workfunc() +// case <-e.ElectionReceived: +// log.Println("ELECTION: Process ELECTION") +// e.StartElection() +// } +// } +// } + +func (e *Election) StartElection() { + e.electionMu.Lock() + defer e.electionMu.Unlock() + + e.state = true + e.answerReceived = make(chan Message) + + memberList, err := getMeberList("membership.json") + if err != nil { + log.Println("ELECTION: Faill to get distinct datanode") + } + e.candidates = memberList + defer func() { e.state = false }() + + log.Printf("ELECTION: Node %s initiates an election.\n", e.startNode) + lowestHost := e.startNode + for _, candidate := range e.candidates { + if candidate < lowestHost { + lowestHost = candidate + } + } + + log.Printf("ELECTION: lowest host is %s, node is %s", lowestHost, e.startNode) + if lowestHost == e.startNode { + e.leader = e.startNode + log.Printf("ELECTION: Node %s is elected as the coordinator.\n", e.startNode) + + // Send COORDINATOR message to all processes with lower ID + for _, candidate := range e.candidates { + if candidate > e.startNode { + // Send COORDINATOR message to candidate + e.sendMessage(candidate, COORDINATOR) + } + } + } else { + // Initiate an election by sending an Election message + // Send it to only processes that have a higher ID than itself + log.Printf("ELECTION: Node %s sends ELECTION to smaller IDs \n", e.startNode) + for _, candidate := range e.candidates { + if candidate < e.startNode { + // Send ELECTION message to candidate + e.sendMessage(candidate, ELECTION) + } + } + + // Wait for an ANSWER message + select { + case answerMessage := <-e.answerReceived: + // Handle the ANSWER message received from another process + log.Printf("ELECTION: Node %s received an ANSWER from %s.\n", e.startNode, answerMessage.Sender) + //TODO: maybe bug here, how to wait timeout seconds? + return + + case <-time.After(e.timeout): + // Received no answer within the timeout, declare itself leader + log.Println("ELECTION: Wait timeout. No answer received.") + + e.SetLeader(e.startNode) + e.onLeaderChange <- struct{}{} + log.Printf("ELECTION: Node %s is elected as the coordinator (no answer received).\n", e.startNode) + + // Send COORDINATOR message to all lower ID processes + for _, candidate := range e.candidates { + if candidate > e.startNode { + // Send COORDINATOR message to candidate + log.Println("ELECTION: Send COORDINATOR to ", candidate) + e.sendMessage(candidate, COORDINATOR) + } + } + + return + } + } +} + +// MessageHandler function to process incoming messages +func (election *Election) ElectionListener() { + // listen on the port + // address := "0.0.0.0:" + PORT + address := ":" + PORT + listener, err := net.Listen("tcp", address) + if err != nil { + log.Fatalf("ELECTION: Error listening on port: %v\n", err) + } + defer listener.Close() + log.Println("ELECTION: Server is listening...") + + // Buffer to store incoming data + buffer := make([]byte, 1024) + + // Listen for incoming messages via network communication + for { + // Accept incoming connections + conn, err := listener.Accept() + if err != nil { + // fmt.Println("Error accepting connection:", err) + continue + } + numBytes, err := conn.Read(buffer) + if err != nil { + log.Printf("ELECTION: Error reading from connection: %v\n", err) + continue + } + + data := buffer[:numBytes] + + // Decode the incoming data into a Message struct + var receivedMessage Message + err = json.Unmarshal(data, &receivedMessage) + if err != nil { + log.Printf("ELECTION: Error decoding incoming data: %v\n", err) + continue + } + + // log.Printf("ELECTION: Received message %+v\n", receivedMessage) + + go election.handleMessage(&receivedMessage) + } +} + +func (election *Election) handleMessage(receivedMessage *Message) { + switch receivedMessage.Type { + case COORDINATOR: + // Handle coordinator message + log.Printf("ELECTION: Received message from %s: COORDINATOR\n", receivedMessage.Sender) + election.SetLeader(receivedMessage.Sender) + election.onLeaderChange <- struct{}{} + + case ELECTION: + // Handle election request + log.Printf("ELECTION: Received message from %s: ELECTION\n", receivedMessage.Sender) + + // Send ANSWER message to sender + election.sendMessage(receivedMessage.Sender, ANSWER) + + // Start a new election + if !election.state { + election.ElectionReceived <- *receivedMessage + // election.StartElection() + } + + case ANSWER: + log.Printf("ELECTION: Received message from %s: ANSWER\n", receivedMessage.Sender) + election.answerReceived <- *receivedMessage + + default: + log.Printf("ELECTION: Node %s received an unknown message type: %d\n", election.startNode, receivedMessage.Type) + } +} + +func (election *Election) sendMessage(recipient string, messageType MsgType) { + message := Message{ + Sender: election.startNode, + Type: MsgType(messageType), + } + messageJSON, err := json.Marshal(message) + if err != nil { + log.Println("ELECTION: Failed to serialize:", err) + return + } + + recipientIP, err := getIP(recipient) + if err != nil { + log.Printf("ELECTION: Fail to send Message to %s because can't resolve IP\n", recipient) + return + } + recipientAddr := recipientIP + ":" + PORT + + // set up tcp connection + conn, err := net.Dial("tcp", recipientAddr) + if err != nil { + // log.Println("ELECTION: Failed to connect to recipient:", err) + return + } + defer conn.Close() + + _, err = conn.Write([]byte(messageJSON)) + if err != nil { + log.Println("ELECTION: Error sending tcp data:", err) + return + } + + // log.Printf("ELECTION: Send message type %d to %s", messageType, recipient) +} + +// resolve hosts' name to IP address +func getIP(hostname string) (string, error) { + ips, err := net.LookupIP(hostname) + if err != nil { + log.Println("ELECTION: Fail to resolve hostname:", err) + return "", err + } + return ips[0].String(), nil +} + +// test +// func main() { +// hostname, _ := os.Hostname() +// selfID := hostname +// election := InitializeElection(selfID, 5) +// go func() { +// for i := 0; i < 30; i++ { +// time.Sleep(1 * time.Second) +// fmt.Println("Current leader: ", election.GetLeader(), election.state) +// if i == 15 { +// // election.SetLeader("") +// election.StartElection() +// } +// } + +// }() +// election.Run() +// select {} +// } + +func getMeberList(filePath string) ([]string, error) { + jsonData, err := readJSONFile(filePath) + if err != nil { + return nil, err + } + // Extract the membershiplist as an array of interface{} + membershiplistInterface, ok := jsonData["membershiplist"].([]interface{}) + if !ok { + return nil, fmt.Errorf("Failed to extract membershiplist") + } + + // Convert the interface{} elements to strings + var membershiplist []string + for _, member := range membershiplistInterface { + if s, ok := member.(string); ok { + // Split the input string by colons + parts := strings.Split(s, ":") + + // Extract the part we need and trim any leading/trailing spaces + address := strings.TrimSpace(parts[0]) + membershiplist = append(membershiplist, address) + } + } + + return membershiplist, nil + +} + +func readJSONFile(filePath string) (map[string]interface{}, error) { + // Read the JSON file + data, err := os.ReadFile(filePath) + if err != nil { + return nil, err + } + + // Parse the JSON data into a generic map + var jsonData map[string]interface{} + if err := json.Unmarshal(data, &jsonData); err != nil { + return nil, err + } + return jsonData, nil +} diff --git a/election/go.mod b/election/go.mod new file mode 100644 index 0000000..ae249fa --- /dev/null +++ b/election/go.mod @@ -0,0 +1,3 @@ +module cs425mp4/election + +go 1.21.0 diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..9e9cf11 --- /dev/null +++ b/go.mod @@ -0,0 +1,13 @@ +module cs425mp4 + +go 1.21.0 + +replace cs425mp4/election => ./election + +replace cs425mp4/membership => ./membership + +replace cs425mp4/sdfs => ./sdfs + +replace cs425mp4/maplejuice => ./maplejuice + +replace cs425mp4/utils => ./utils diff --git a/leader.txt b/leader.txt new file mode 100644 index 0000000..efeb015 --- /dev/null +++ b/leader.txt @@ -0,0 +1 @@ +fa23-cs425-0203.cs.illinois.edu \ No newline at end of file diff --git a/main.go b/main.go new file mode 100644 index 0000000..7905807 --- /dev/null +++ b/main.go @@ -0,0 +1,5 @@ +package main + +func main() { + +} diff --git a/maplejuice/application_manager.go b/maplejuice/application_manager.go new file mode 100644 index 0000000..71d52cf --- /dev/null +++ b/maplejuice/application_manager.go @@ -0,0 +1,24 @@ +package maplejuice + +type ApplicationManager struct { + NodeManagers []*NodeManager + // Other application-level fields... +} + +func NewApplicationManager(nodeManagers []*NodeManager) *ApplicationManager { + return &ApplicationManager{ + NodeManagers: nodeManagers, + // Initialize other fields... + } +} + +func (am *ApplicationManager) StartMapleJuiceProcess(tasks []Task) { + // Logic to start and manage the MapleJuice process... + +} + +func (am *ApplicationManager) GatherResults(taskIDs []string) { + // Logic to gather results of tasks... +} + +// Other application management methods... diff --git a/maplejuice/go.mod b/maplejuice/go.mod new file mode 100644 index 0000000..552d62c --- /dev/null +++ b/maplejuice/go.mod @@ -0,0 +1,3 @@ +module cs425mp4/maplejuice + +go 1.21.0 diff --git a/maplejuice/node_manager.go b/maplejuice/node_manager.go new file mode 100644 index 0000000..3c79eba --- /dev/null +++ b/maplejuice/node_manager.go @@ -0,0 +1,23 @@ +package maplejuice + +type NodeManager struct { + ResourceManager *ResourceManager + // Other node-specific fields... +} + +func NewNodeManager(resourceManager *ResourceManager) *NodeManager { + return &NodeManager{ + ResourceManager: resourceManager, + // Initialize other fields... + } +} + +func (nm *NodeManager) ExecuteTask(task Task) { + // Logic to execute a given task... +} + +func (nm *NodeManager) HandleFailure(task Task) { + // Logic to handle task failure... +} + +// Other node management methods... diff --git a/maplejuice/resource_manager.go b/maplejuice/resource_manager.go new file mode 100644 index 0000000..9e3cdde --- /dev/null +++ b/maplejuice/resource_manager.go @@ -0,0 +1,22 @@ +package maplejuice + +type ResourceManager struct { + // Resource tracking and management fields... +} + +func NewResourceManager() *ResourceManager { + return &ResourceManager{ + // Initialize resource manager... + } +} + +func (rm *ResourceManager) AllocateResources(task Task) bool { + // Logic to allocate resources for a task... + return true +} + +func (rm *ResourceManager) ReleaseResources(task Task) { + // Logic to release resources after task completion... +} + +// Other resource management methods... diff --git a/maplejuice/types.go b/maplejuice/types.go new file mode 100644 index 0000000..01cdab8 --- /dev/null +++ b/maplejuice/types.go @@ -0,0 +1,18 @@ +package maplejuice + +// Common types and utility functions... + +type Task struct { + ID string + Type string // "maple" or "juice" + Data []string + Executable string +} + +type TaskResult struct { + TaskID string + Output []string + Error error +} + +// Other necessary types and utilities... diff --git a/membership.json b/membership.json new file mode 100644 index 0000000..a270839 --- /dev/null +++ b/membership.json @@ -0,0 +1 @@ +{"id":"vpnpool-10-250-47-123.near.illinois.edu:76.10.46.206:8000:1698897035","membershiplist":["vpnpool-10-250-47-123.near.illinois.edu:76.10.46.206:8000:1698897035"]} \ No newline at end of file diff --git a/membership/FailedList.go b/membership/FailedList.go new file mode 100644 index 0000000..37b8e04 --- /dev/null +++ b/membership/FailedList.go @@ -0,0 +1,51 @@ +package membership + +import "sync" + +type FailedList struct { + mu sync.Mutex + list []string +} + +func (f *FailedList) Add(id string) { + f.mu.Lock() + defer f.mu.Unlock() + found := false + for _, item := range f.list { + if item == id { + found = true + break + } + } + if !found { + f.list = append(f.list, id) + } +} + +func (f *FailedList) Exists(id string) bool { + f.mu.Lock() + defer f.mu.Unlock() + for _, item := range f.list { + if item == id { + return true + } + } + return false +} + +func (f *FailedList) Delete(id string) { + f.mu.Lock() + defer f.mu.Unlock() + found := false + var idx int + for i, item := range f.list { + if item == id { + found = true + idx = i + break + } + } + if found { + f.list = append(f.list[:idx], f.list[idx+1:]...) + } +} diff --git a/membership/MembershipList.go b/membership/MembershipList.go new file mode 100644 index 0000000..252654a --- /dev/null +++ b/membership/MembershipList.go @@ -0,0 +1,79 @@ +package membership + +import ( + "fmt" + "log" + "sync" +) + +type Status int + +const ( + ALIVE Status = iota + SUSPECTED + FAILED + LEFT +) + +type MembershipInfo struct { + Heartbeat int + Timestamp int64 + Status +} + +func (info MembershipInfo) String() string { + return fmt.Sprintf("Heartbeat: %d, Timestamp: %d, Status: %d", info.Heartbeat, info.Timestamp, info.Status) +} + +type MembershipList struct { + mu sync.Mutex + ml map[string]MembershipInfo +} + +func (ml *MembershipList) String() string { + ml.mu.Lock() + defer ml.mu.Unlock() + + var result string + + for id, info := range ml.ml { + result += fmt.Sprintf("ID: %s ", id) + result += fmt.Sprintf("MembershipInfo: %s\n", info) + } + + return result +} + +func (ml *MembershipList) Get(id string) (*MembershipInfo, bool) { + ml.mu.Lock() + defer ml.mu.Unlock() + mem, exists := ml.ml[id] + if exists { + return &mem, true + } + return nil, false +} + +func (ml *MembershipList) ExistHost(hostname string) bool { + ml.mu.Lock() + defer ml.mu.Unlock() + + // Check if the hostname (id) exists in the membership list + for id := range ml.ml { + if (ml.ml[id].Status == FAILED) || (ml.ml[id].Status == LEFT) { + continue + } + log.Println("Membership: parse id", id, parseHostname(id)) + if parsedHostname := parseHostname(id); parsedHostname == hostname { + log.Println("Membership: leader exists: ", parseHostname(id)) + return true + } + } + return false +} + +func (ml *MembershipList) GetList() map[string]MembershipInfo { + ml.mu.Lock() + defer ml.mu.Unlock() + return ml.ml +} diff --git a/membership/go.mod b/membership/go.mod new file mode 100644 index 0000000..7d5c1a4 --- /dev/null +++ b/membership/go.mod @@ -0,0 +1,3 @@ +module cs425mp4/membership + +go 1.21.0 diff --git a/membership/messageHandler.go b/membership/messageHandler.go new file mode 100644 index 0000000..3a11c0c --- /dev/null +++ b/membership/messageHandler.go @@ -0,0 +1,389 @@ +package membership + +import ( + "context" + "encoding/json" + "fmt" + "log" + "math/rand" + "net" + "os" + "time" +) + +// define message +type MsgType int + +const ( + JOIN MsgType = iota + LEAVE + REJOIN + GOSSIP + GOSSIPS + CLIENT +) + +type Message struct { + Sender string + Type MsgType + Payload interface{} +} + +func (n *Node) sendMessage(address string, msgType MsgType, Payload interface{}) { + message := Message{ + Sender: n.ID.String(), + Type: MsgType(msgType), + Payload: Payload, + } + messageJSON, err := json.Marshal(message) + if err != nil { + log.Println("GOSSIP messageHandler.go: Failed to serialize MembershipList to JSON:", err) + return + } + + // set up udp connection + conn, err := net.Dial("udp", address) + if err != nil { + log.Println("GOSSIP messageHandler.go: Failed to connect to introducer:", err) + return + } + defer conn.Close() + + _, err = conn.Write([]byte(messageJSON)) + if err != nil { + log.Println("GOSSIP messageHandler.go: Error sending UDP data:", err) + return + } + + // log.Printf("GOSSIP messageHandler.go: Send message type %d to %s: %s", msgType, address, messageJSON) +} + +func (n *Node) StartMessageHandler(ctx context.Context) { + // listen on the port + address := "0.0.0.0:" + n.ID.Port + listener, err := net.ListenPacket("udp", address) + if err != nil { + log.Fatalf("Error listening on port: %v\n", err) + } + defer listener.Close() + log.Println("GOSSIP messageHandler.go: Server is listening on port", n.ID.Port) + + // Buffer to store incoming data + buffer := make([]byte, 1024) + // handling connections + for { + select { + case <-ctx.Done(): + log.Println("GOSSIP messageHandler.go: startMessageHandler received stop signal") + return + default: + // Read data from UDP connection + numBytes, _, err := listener.ReadFrom(buffer) + if err != nil { + log.Printf("GOSSIP messageHandler.go: Error reading from connection: %v\n", err) + continue + } + + data := buffer[:numBytes] + + // Decode the incoming data into a Message struct + var receivedMessage Message + err = json.Unmarshal(data, &receivedMessage) + if err != nil { + log.Printf("GOSSIP messageHandler.go: Error decoding incoming data: %v\n", err) + continue + } + + // log.Printf("GOSSIP messageHandler.go: Received message from %s: %+v\n", addr.String(), receivedMessage) + + // Handle the received Message struct as needed + go handleMessage(n, receivedMessage) + } + } + +} + +func handleMessage(n *Node, receivedMessage Message) { + // Extract the message elements from the received message + sender := receivedMessage.Sender + messageType := receivedMessage.Type + msg := receivedMessage.Payload + + if n.FailedList.Exists(sender) { + // if detect false positives, send rejoin message and ask it rejoin + n.sendRejoin(sender) + // log.Printf("GOSSIP messageHandler.go: False Positive: ID: %s \n", sender) + // fmt.Printf("False Positive: ID: %s \n", sender) + return + } + // Handle different message types + switch messageType { + case CLIENT: + n.handleClient(msg) + case JOIN: + n.handleJoin(msg) + + case LEAVE: + n.handleLeave(msg) + case GOSSIP: + // Introduce a message drop rate for gossip + r := rand.Float64() + if r < n.Configuration.P_ml { + log.Println("GOSSIP messageHandler.go: Drop Message", messageType, msg) + return + } + n.handleGossip(msg) + + case GOSSIPS: + // Introduce a message drop rate for gossip + r := rand.Float64() + if r < n.Configuration.P_ml { + log.Println("GOSSIP messageHandler.go: Drop Message", messageType, msg) + return + } + n.handleGossipS(msg) + + case REJOIN: + log.Printf("GOSSIP messageHandler.go: False Positive: sender ID: %s \n", sender) + fmt.Printf("False Positive: sender ID: %s \n", sender) + n.handleRejoin(msg) + + default: + log.Printf("GOSSIP messageHandler.go: Unknown message type: %d", messageType) + } +} + +func (node *Node) handleRejoin(payload interface{}) { + id := payload.(string) + t := parseTimestamp(id) + // if the rejoin version is lower than current, just ignore + if t < node.ID.Timestamp { + return + } + // update current timestamp + node.ID.Timestamp = time.Now().Unix() + node.Join() + log.Println("GOSSIP messageHandler.go: Handle Rejoin: Update ID:", node.ID.String()) +} + +func (n *Node) handleJoin(payload interface{}) { + // As an introducer, handle JOIN messages, add the info of new node into list and sent it to new node + log.Printf("GOSSIP messageHandler.go: Handle Join: %v", payload) + membershipList, ok := payload.(map[string]interface{}) + if !ok { + log.Println("GOSSIP messageHandler.go: Fail to parse payload...") + } + // update self's membershiplist + n.Combine(&membershipList) + n.onChange <- struct{}{} + log.Println("GOSSIP messageHandler.go: Successfully update self's membershiplist") + if n.Configuration.Mode == "Gossip" { + msg := n.GetGossipList() + // send the current membership list to the new node + for id := range membershipList { + address := parseAddress(id) + n.sendMessage(address, MsgType(GOSSIP), msg) + } + } else { + msg := n.GetGossipSList() + // send the current membership list to the new node + for id := range membershipList { + address := parseAddress(id) + n.sendMessage(address, MsgType(GOSSIPS), msg) + } + } + log.Printf("GOSSIP messageHandler.go: Successfully handle JOIN, current MembershipList:\n%s", &n.MembershipList) +} + +func (n *Node) handleLeave(payload interface{}) { + //log.Printf("GOSSIP messageHandler.go: Handle Leave: %T", payload) //string + log.Printf("GOSSIP messageHandler.go: handling leave message...") + // get the id of LEAVE node + leaveNodeID := payload.(string) + log.Printf("GOSSIP messageHandler.go: leaveNodeID: %s", leaveNodeID) + // if node has already been removed, return + if n.FailedList.Exists(leaveNodeID) { + log.Println("GOSSIP messageHandler.go: Already removed leave node") + return + } + // mark the sender as LEAVE message + n.MembershipList.mu.Lock() + membershipInfo := n.MembershipList.ml + for id, memInfo := range membershipInfo { + if id == leaveNodeID { + // if already marked as left, do nothing + if memInfo.Status == Status(LEFT) { + n.MembershipList.mu.Unlock() + log.Println("GOSSIP messageHandler.go: Already set leave node LEFT") + return + } else { + // Mark as left + memInfo.Status = Status(LEFT) + membershipInfo[id] = memInfo + log.Println("GOSSIP messageHandler.go: Mark leave node LEFT") + } + } + } + n.MembershipList.mu.Unlock() + // send LEAVE msg to neighbors. + candidates := n.GetAliveCandidates() + for _, candidate := range candidates { + candAddr := parseAddress(candidate) + n.sendMessage(candAddr, MsgType(LEAVE), leaveNodeID) + log.Println("GOSSIP messageHandler.go: Send candidate LeaveNode") + } + log.Println("GOSSIP messageHandler.go: Successfully handle Leave") +} + +func (n *Node) handleGossip(payload interface{}) { + // handle GOSSIP messages + // log.Printf("GOSSIP messageHandler.go: Handle Gossip: %T", payload) + + // As an introducer, handle JOIN messages, add the info of new node into list and sent it to new node + membershipList, ok := payload.(map[string]interface{}) + if !ok { + log.Println("GOSSIP messageHandler.go: Fail to parse payload...") + } + // update self's membershiplist + modified := n.combineGossip(&membershipList) + if modified { + n.onChange <- struct{}{} + } + // log.Printf("GOSSIP messageHandler.go: Successfully handle Gossip, current MembershipList:\n %s", &n.MembershipList) +} + +func (n *Node) handleGossipS(payload interface{}) { + // handle GOSSIP messages + // log.Printf("GOSSIP messageHandler.go: Handle Gossip: %T", payload) + + // As an introducer, handle JOIN messages, add the info of new node into list and sent it to new node + membershipList, ok := payload.(map[string]interface{}) + if !ok { + log.Println("GOSSIP messageHandler.go: Fail to parse payload...") + } + // update self's membershiplist + modified := n.combineGossipS(&membershipList) + if modified { + n.onChange <- struct{}{} + } + // log.Printf("GOSSIP messageHandler.go: Successfully handle GossipS, current MembershipList:\n %s", &n.MembershipList) +} + +func (n *Node) handleClient(config interface{}) { + if cfg, ok := config.(map[string]interface{}); ok { + if tGossip, ok := cfg["T_gossip"].(float64); ok { + n.T_gossip = tGossip + } + if fanout, ok := cfg["Fanout"].(float64); ok { + n.Fanout = int(fanout) + } + if mode, ok := cfg["Mode"].(string); ok { + n.Mode = mode + } + if pMl, ok := cfg["P_ml"].(float64); ok { + n.P_ml = pMl + } + if tCleanup, ok := cfg["T_cleanup"].(float64); ok { + n.T_cleanup = tCleanup + } + if tFail, ok := cfg["T_fail"].(float64); ok { + n.T_fail = tFail + } + if n.Mode == "Gossip+S" { + if tSuspicion, ok := cfg["T_suspicion"].(float64); ok { + n.T_suspicion = tSuspicion + } + } + } + log.Printf("GOSSIP messageHandler.go: Successfully handle CLIENT, current configuration: Fanout: %d, T_gossip: %.2f, T_fail: %.2f, T_cleanup:%.2f, T_suspicion: %.2f, P_ml: %.2f, Mode: %s", n.Fanout, n.T_gossip, n.T_fail, n.T_cleanup, n.T_suspicion, n.P_ml, n.Mode) +} + +func (n *Node) Gossip(ctx context.Context) { + // for T_gossip microseconds, update current membership and gossip + for { + select { + case <-ctx.Done(): + log.Println("GOSSIP messageHandler.go: Gossip received stop signal") + return + default: + if n.Configuration.Mode == "Gossip" { + // update self membership list + modified := n.updateMembershipGossip() + if modified { + n.onChange <- struct{}{} + } + gossipList := n.GetGossipList() + // get candidates + candidates := n.GetAliveCandidates() + + // send the gossip membership list + // log.Println("GOSSIP messageHandler.go: Gossiping to candidates: ", candidates) + + for _, cand := range candidates { + address := parseAddress(cand) + n.sendMessage(address, MsgType(GOSSIP), gossipList) + } + } else { + // update self membership list + modified := n.updateMembershipGossipS() + if modified { + n.onChange <- struct{}{} + } + gossipList := n.GetGossipSList() + // get candidates + candidates := n.GetAliveCandidates() + + // send the gossip membership list + // log.Println("GOSSIP messageHandler.go: Gossiping to candidates: ", candidates) + + for _, cand := range candidates { + address := parseAddress(cand) + n.sendMessage(address, MsgType(GOSSIPS), gossipList) + } + } + + time.Sleep(time.Duration(n.Configuration.T_gossip*1000) * time.Millisecond) + } + } +} + +func (node *Node) Join() { + // solve introducer DNS address to ip address + ips, err := net.LookupIP(node.Configuration.Introducor) + if err != nil { + log.Println("GOSSIP messageHandler.go: Fail to resolve hostname:", err) + os.Exit(1) + } + for _, ip := range ips { + log.Println("GOSSIP messageHandler.go: Introducer IP Address:", ip.String()) + } + introIP := ips[0].String() + + // If the node is introducer, just wait for connection + // if node.ID.IP == introIP { + // return + // } + // If the node is not introducer, send JOIN message to introducer + // Establish a UDP connection to the introducer + introducerAddress := introIP + ":" + node.ID.Port + msg := make(map[string][]int) + msg[node.ID.String()] = []int{0, int(Status(ALIVE))} + node.SaveMembershipList() + node.sendMessage(introducerAddress, MsgType(JOIN), msg) +} + +func (node *Node) Leave() { + // get alive candidates + candidates := node.GetAliveCandidates() + + for _, candidate := range candidates { + candAddr := parseAddress(candidate) + node.sendMessage(candAddr, MsgType(LEAVE), node.ID.String()) + } + log.Println("GOSSIP messageHandler.go: Sent Leave message") +} + +func (node *Node) sendRejoin(id string) { + candAddr := parseAddress(id) + node.sendMessage(candAddr, MsgType(REJOIN), id) +} diff --git a/membership/node.go b/membership/node.go new file mode 100644 index 0000000..c1e7285 --- /dev/null +++ b/membership/node.go @@ -0,0 +1,410 @@ +package membership + +import ( + "context" + "encoding/json" + "fmt" + "log" + "math/rand" + "net" + "os" + "sync" + "time" +) + +// define node +type ID struct { + Hostname string + IP string + Port string + Timestamp int64 +} + +func (id ID) String() string { + return fmt.Sprintf("%s:%s:%s:%d", id.Hostname, id.IP, id.Port, id.Timestamp) +} + +type Configuration struct { + Introducor string + Port string + Fanout int + T_gossip float64 + T_fail float64 + T_cleanup float64 + T_suspicion float64 + P_ml float64 + Mode string +} + +type Node struct { + ID + Configuration + MembershipList + FailedList + onChange chan struct{} +} + +func InitializeNode(config Configuration) *Node { + // get local ip address + addrs, err := net.InterfaceAddrs() + if err != nil { + fmt.Println(err) + os.Exit(1) + } + var localIP string + for _, addr := range addrs { + if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet.IP.To4() != nil { + localIP = ipnet.IP.String() + } + } + } + + hostname, _ := os.Hostname() + id := ID{Hostname: hostname, IP: localIP, Port: config.Port, Timestamp: time.Now().Unix()} + membershipInfo := MembershipInfo{ + Heartbeat: 0, + Timestamp: time.Now().Unix(), + Status: Status(ALIVE), + } + membershipList := make(map[string]MembershipInfo) + membershipList[id.String()] = membershipInfo + + myNode := Node{ + ID: id, + Configuration: config, + MembershipList: MembershipList{ + mu: sync.Mutex{}, + ml: membershipList, + }, + FailedList: FailedList{ + mu: sync.Mutex{}, + list: []string{}, + }, + onChange: make(chan struct{}), + } + + log.Printf("GOSSIP node.go: Successfully initialize node: %s", myNode.ID) + + return &myNode +} + +func (node *Node) Run(ctx context.Context, workfunc func()) { + node.Join() + go node.StartMessageHandler(ctx) + go node.Gossip(ctx) + for range node.onChange { + log.Println("GOSSIP node.go: Handle membership change") + workfunc() + } +} + +func (node *Node) GetMembershipList() string { + // only return non-failed members + node.MembershipList.mu.Lock() + defer node.MembershipList.mu.Unlock() + var membershipList string + for id, memInfo := range node.MembershipList.ml { + if memInfo.Status == Status(ALIVE) || memInfo.Status == Status(SUSPECTED) { + membershipList += fmt.Sprintf("ID: %s ", id) + membershipList += fmt.Sprintf("MembershipInfo: %s\n", memInfo) + } + } + return membershipList +} + +func (node *Node) SaveMembershipList() { + // only return non-failed members + node.MembershipList.mu.Lock() + defer node.MembershipList.mu.Unlock() + var membershipList []string + for id, memInfo := range node.MembershipList.ml { + if memInfo.Status == Status(ALIVE) || memInfo.Status == Status(SUSPECTED) { + membershipList = append(membershipList, id) + } + } + + data := struct { + ID string `json:"id"` + MembershipIDs []string `json:"membershiplist"` + }{ + ID: node.ID.String(), + MembershipIDs: membershipList, + } + + // Marshal the data to JSON + jsonData, err := json.Marshal(data) + if err != nil { + log.Println("GOSSIP node.go: Error marshaling data:", err) + return + } + + // Write the JSON data to a file + filePath := "membership.json" + err = os.WriteFile(filePath, jsonData, 0644) + if err != nil { + log.Println("GOSSIP node.go: Error writing JSON data to the file:", err) + return + } + + log.Printf("GOSSIP node.go: Membership data saved to '%s'.\n", filePath) + +} + +func (node *Node) Print() { + // only return non-failed members + node.MembershipList.mu.Lock() + defer node.MembershipList.mu.Unlock() + var membershipList string + for id, memInfo := range node.MembershipList.ml { + if memInfo.Status == Status(ALIVE) || memInfo.Status == Status(SUSPECTED) { + membershipList += fmt.Sprintf("ID: %s ", id) + membershipList += fmt.Sprintf("MembershipInfo: %s\n", memInfo) + } + } + fmt.Println(membershipList) +} + +func (node *Node) GetAliveCandidates() []string { + node.MembershipList.mu.Lock() + defer node.MembershipList.mu.Unlock() + fanout := node.Configuration.Fanout + cand := []string{} + for id, memInfo := range node.MembershipList.ml { + if id == node.ID.String() { + // don't send to self + continue + } + if memInfo.Status == Status(ALIVE) || memInfo.Status == Status(SUSPECTED) { + cand = append(cand, id) + } + } + // random select #fanout nodes and connect to the selected nodes + rand.Shuffle(len(cand), func(i, j int) { + cand[i], cand[j] = cand[j], cand[i] + }) + if fanout > len(cand) { + fanout = len(cand) + } + cand = cand[:fanout] + return cand +} + +func (node *Node) Combine(other_ml *map[string]interface{}) { + if node.Configuration.Mode == "Gossip" { + node.combineGossip(other_ml) + } else if node.Configuration.Mode == "Gossip+S" { + node.combineGossipS(other_ml) + } +} + +// func (n *Node) UpdateMembershipList() { +// if n.Configuration.Mode == "Gossip" { +// n.updateMembershipGossip() +// } else { +// n.updateMembershipGossipS() +// } +// } + +func (node *Node) combineGossip(other_ml *map[string]interface{}) bool { + node.MembershipList.mu.Lock() + defer node.MembershipList.mu.Unlock() + modified := false + ml := node.MembershipList.ml + current_time := time.Now().Unix() + for id, info := range *other_ml { + info, _ := info.([]interface{}) + arr := make([]int, 2) + for i, v := range info { + arr[i] = int(v.(float64)) + } + heartbeat := arr[0] + currentMem, exists := ml[id] + if exists { + if (heartbeat > currentMem.Heartbeat) && (currentMem.Status != Status(FAILED)) && (currentMem.Status != Status(LEFT)) { + // Update heartbeat if the current member is not Failed or Left + currentMem.Heartbeat = heartbeat + currentMem.Timestamp = current_time + } + ml[id] = currentMem + } else { + if !node.FailedList.Exists(id) { + log.Printf("GOSSIP node.go: Add new member: ID: %s timestamp: %v\n", id, current_time) + // fmt.Printf("Add new member: ID: %s timestamp: %v\n", id, current_time) + ml[id] = MembershipInfo{Heartbeat: heartbeat, Timestamp: current_time, Status: Status(ALIVE)} + modified = true + } + } + } + return modified +} + +func (node *Node) combineGossipS(other_ml *map[string]interface{}) bool { + node.MembershipList.mu.Lock() + defer node.MembershipList.mu.Unlock() + + modified := false + ml := node.MembershipList.ml + current_time := time.Now().Unix() + for id, info := range *other_ml { + info, _ := info.([]interface{}) + arr := make([]int, 2) + for i, v := range info { + arr[i] = int(v.(float64)) + } + heartbeat := int(arr[0]) + status := Status(int(arr[1])) + currentMem, exists := ml[id] + if !exists { + if !node.FailedList.Exists(id) { + ml[id] = MembershipInfo{Heartbeat: heartbeat, Timestamp: current_time, Status: status} + log.Printf("GOSSIP node.go: Add new member: ID: %s timestamp: %v\n", id, current_time) + modified = true + // fmt.Printf("Add new member: ID: %s timestamp: %v\n", id, current_time) + } + } else if (currentMem.Status != Status(FAILED)) && (currentMem.Status != Status(LEFT)) { + if (status == Status(FAILED)) || (status == Status(LEFT)) { + currentMem.Status = status + currentMem.Timestamp = current_time + log.Printf("GOSSIP node.go: COMBINE FAILED: ID: %s timestamp: %v\n", id, current_time) + fmt.Printf("COMBINE FAILED: ID: %s timestamp: %v\n", id, current_time) + node.FailedList.Add(id) + modified = true + } else if heartbeat > currentMem.Heartbeat { + currentMem.Status = status + currentMem.Heartbeat = heartbeat + currentMem.Timestamp = current_time + log.Printf("GOSSIP node.go: COMBINE: higher heartbeat. ID: %s status: %d, timestamp: %v\n", id, status, current_time) + } else if (heartbeat == currentMem.Heartbeat) && (status == Status(SUSPECTED)) && (currentMem.Status == Status(ALIVE)) { + currentMem.Status = status + currentMem.Timestamp = current_time + log.Printf("GOSSIP node.go: COMBINE SUSPECTED:ID: %s status: %d, timestamp: %v\n", id, status, current_time) + fmt.Printf("COMBINE SUSPECTED:ID: %s status: %d, timestamp: %v\n", id, status, current_time) + } + ml[id] = currentMem + } + } + return modified +} + +func (n *Node) updateMembershipGossip() bool { + // update the membershiplist with gossip mode + n.MembershipList.mu.Lock() + defer n.MembershipList.mu.Unlock() + modified := false + // store the candidate nodes to gossip + ml := n.MembershipList.ml + current_time := time.Now().Unix() + cleanupList := make([]string, 0, len(ml)) + for id, info := range ml { + if id == n.ID.String() { + // update self info + info.Heartbeat++ + info.Timestamp = current_time + info.Status = Status(ALIVE) + } else { + time_diff := float64(current_time - info.Timestamp) + if (info.Status != Status(FAILED)) && (info.Status != Status(LEFT)) && (time_diff > float64(n.Configuration.T_fail)) { + // mark the member as failed + info.Status = Status(FAILED) + info.Timestamp = current_time + log.Printf("GOSSIP node.go: UPDATE FAILED: ID: %s timestamp: %v\n", id, current_time) + fmt.Printf("UPDATE FAILED: ID: %s timestamp: %v\n", id, current_time) + n.FailedList.Add(id) + modified = true + } else if (info.Status == Status(FAILED) || info.Status == Status(LEFT)) && (time_diff > float64(n.Configuration.T_cleanup)) { + // clean up the member from membershiplist + cleanupList = append(cleanupList, id) + } + } + ml[id] = info + } + + // clean up members in the cleanupList + for _, id := range cleanupList { + // n.FailedList.Delete(id) // comment out to avoid ghost member + delete(ml, id) + log.Printf("GOSSIP node.go: CLEANUP: ID: %s timestamp: %v\n", id, current_time) + // fmt.Printf("CLEANUP: ID: %s timestamp: %v\n", id, current_time) + } + return modified +} + +func (n *Node) updateMembershipGossipS() bool { + // update the membershiplist with gossip mode, return current non-failed members + n.MembershipList.mu.Lock() + defer n.MembershipList.mu.Unlock() + modified := false + + // store the candidate nodes to gossip + ml := n.MembershipList.ml + current_time := time.Now().Unix() + cleanupList := make([]string, 0, len(ml)) + for id, info := range ml { + if id == n.ID.String() { + // update self info + info.Heartbeat++ + info.Timestamp = current_time + info.Status = Status(ALIVE) + } else { + time_diff := float64(current_time - info.Timestamp) + if (info.Status == Status(FAILED) || info.Status == Status(LEFT)) && (time_diff > float64(n.Configuration.T_cleanup)) { + // clean up the member from membershiplist + cleanupList = append(cleanupList, id) + } else if (info.Status == Status(ALIVE)) && (time_diff > float64(n.Configuration.T_suspicion)) { + // mark the member as suspected + info.Status = Status(SUSPECTED) + info.Timestamp = current_time + log.Printf("GOSSIP node.go: UPDATE SUSPECTED: ID: %s timestamp: %v\n", id, current_time) + fmt.Printf("UPDATE SUSPECTED: ID: %s timestamp: %v\n", id, current_time) + } else if (info.Status == Status(SUSPECTED)) && (time_diff > float64(n.Configuration.T_fail)) { + // mark the member as failed + info.Status = Status(FAILED) + info.Timestamp = current_time + log.Printf("GOSSIP node.go: UPDATE FAILED: ID: %s timestamp: %v\n", id, current_time) + fmt.Printf("UPDATE FAILED: ID: %s timestamp: %v\n", id, current_time) + n.FailedList.Add(id) + modified = true + } + } + ml[id] = info + } + + // clean up members in the cleanupList + for _, id := range cleanupList { + // n.FailedList.Delete(id) // comment out to avoid ghost member + delete(ml, id) + log.Printf("GOSSIP node.go: CLEANUP: ID: %s timestamp: %v\n", id, current_time) + // fmt.Printf("CLEANUP: ID: %s timestamp: %v\n", id, current_time) + } + return modified +} + +func (n *Node) GetGossipList() map[string][]int { + // return current non-failed members + n.MembershipList.mu.Lock() + defer n.MembershipList.mu.Unlock() + + ml := n.MembershipList.ml + gossipList := make(map[string][]int) + for id, info := range ml { + if info.Status == Status(ALIVE) { + gossipList[id] = []int{info.Heartbeat, int(info.Status)} + } + } + return gossipList +} + +func (n *Node) GetGossipSList() map[string][]int { + // return every member + n.MembershipList.mu.Lock() + defer n.MembershipList.mu.Unlock() + + ml := n.MembershipList.ml + gossipList := make(map[string][]int) + for id, info := range ml { + gossipList[id] = []int{info.Heartbeat, int(info.Status)} + + } + return gossipList +} diff --git a/membership/utils.go b/membership/utils.go new file mode 100644 index 0000000..6aec051 --- /dev/null +++ b/membership/utils.go @@ -0,0 +1,47 @@ +package membership + +import ( + "log" + "net" + "os" + "strconv" + "strings" +) + +func parseHostname(id string) string { + components := strings.Split(id, ":") + return components[0] +} + +func parseAddress(id string) string { + components := strings.Split(id, ":") + return components[1] + ":" + components[2] +} + +func parseTimestamp(id string) int64 { + components := strings.Split(id, ":") + t, _ := strconv.ParseInt(components[3], 10, 64) + return t +} + +// resolve hosts' name to IP address +func getIP(hostname string) net.IP { + ips, err := net.LookupIP(hostname) + if err != nil { + log.Println("Fail to resolve hostname:", err) + os.Exit(1) + } + for _, ip := range ips { + log.Println("IP Address:", ip.String()) + } + return ips[0] +} + +func getLocalIP() net.IP { + hostname, err := os.Hostname() + if err != nil { + log.Println("Fail to get hostname:", err) + os.Exit(1) + } + return getIP(hostname) +} diff --git a/sdfs/client.go b/sdfs/client.go new file mode 100644 index 0000000..6ff13b0 --- /dev/null +++ b/sdfs/client.go @@ -0,0 +1,486 @@ +// Client.go +// Handle get/put/delete request functions + +package sdfs + +import ( + "encoding/json" + "fmt" + "log" + "net" + "os" + "os/exec" + "strconv" + "strings" + "time" +) + +type Client struct { + ID string + Port string + NamenodeAddr string +} + +func InitializeClient(id, port, namenodeAddr string) *Client { + client := new(Client) + client.ID = parseIP(id) + ":" + port + client.Port = port + client.NamenodeAddr = namenodeAddr + return client +} + +func (c *Client) Run() { + listener, err := net.Listen("tcp", ":"+c.Port) + if err != nil { + log.Fatalf("CLIENT: Error listening on port: %v\n", err) + } + defer listener.Close() + log.Println("CLIENT: Server is listening...") + + // Listen for incoming messages via network communication + for { + // Accept incoming connections + conn, err := listener.Accept() + if err != nil { + log.Println("CLIENT: Error accepting connection:", err) + continue + } + message, err := c.readString(conn) + if err != nil { + log.Printf("CLIENT: Error decoding incoming data: %v\n", err) + continue + } + + log.Printf("CLIENT: Received message %+v\n", message) + + go c.handleMessage(conn, message) + } +} + +func (c *Client) handleMessage(conn net.Conn, receivedMessage string) { + components := strings.Split(receivedMessage, " ") + switch components[0] { + case "GET": + log.Println("CLIENT: Received GET request") + if len(components) == 3 { + sdfsFilename := components[1] + localFilename := components[2] + c.handleGet(conn, sdfsFilename, localFilename) + } else { + log.Printf("CLIENT: Received invalid GET request %s\n", receivedMessage) + } + case "PUT": + log.Println("CLIENT: Received PUT request") + if len(components) == 3 { + localFilename := components[1] + sdfsFilename := components[2] + c.handlePUT(conn, localFilename, sdfsFilename) + } else { + log.Printf("CLIENT: Received invalid PUT request %s\n", receivedMessage) + } + default: + log.Printf("CLIENT: received an unknown message: %s\n", receivedMessage) + } +} + +func (c *Client) Get(sdfsFileName, localFileName string) error { + log.Println("CLIENT: GET : Get", sdfsFileName, "from SDFS") + // Step 1: Connect to the NameNode using TCP + nameNodeConn, err := net.Dial("tcp", c.NamenodeAddr) + if err != nil { + log.Println("CLIENT: GET :Fail to connect to namenode", err) + return err + } + defer nameNodeConn.Close() + + log.Println("CLIENT: GET :Connect to NameNode ", c.NamenodeAddr) + // Step 2: Send a PUT request to the NameNode + messageID := time.Now().Unix() + err = c.writeMessage(nameNodeConn, GET, sdfsFileName, messageID) + if err != nil { + log.Println("CLIENT: GET :Fail to send GET request to namenode", err) + return err + } + + log.Println("CLIENT: GET :Send GET request to namenode") + // step 3: get a list of datanode from the server + message, err := c.readMessage(nameNodeConn) + if err != nil { + log.Println("CLIENT: GET : fail to read message from server", err) + return err + } + assignment, err := payloadToAssignment(message.Payload) + if err != nil { + log.Println("CLIENT: GET : Fail to parse GET payload to assignment", err) + return err + } + + log.Println("CLIENT: GET :Get assignment from namenode") + // Step 4: For each datanode, fetch file from the datanode + suc := false + for _, datanode := range assignment.Datanodes { + remote := datanode + ":cs425mp3/sdfs/" + assignment.Filename + "-" + strconv.Itoa(assignment.Version) + cmd := exec.Command("scp", "-l", strconv.Itoa(Limit), remote, localFileName) + //cmd := exec.Command("scp", remote, localFileName) + cmd.Stdout = os.Stderr + cmd.Stderr = os.Stderr + + if err := cmd.Run(); err != nil { + log.Printf("CLIENT: GET : Fail to get data from datanode %s: %v\n", datanode, err) + continue + } + suc = true + break + } + log.Printf("CLIENT: GET : Successfully get data from datanode\n") + // Step 5: acknowledge the server + err = c.writeMessage(nameNodeConn, GETCONFIRM, assignment, messageID) + if err != nil { + log.Println("CLIENT: GET : Fail to send GETCONFIRM request to namenode", err) + return err + } + + if suc { + return nil + } else { + return fmt.Errorf("failed to GET from datanodes") + } +} + +func (c *Client) Multiread(sdfsFileName, localFileName string, VMs []string) { + // VMs are a list of VM numbers such as 01 ... 10 + resultChannel := make(chan string, len(VMs)) // Create a channel to collect results + + for _, vm := range VMs { + go func(vm string) { + log.Println("multiread: send GET request to", vm) + // Step 1: get the VM hostname fa23-cs425-02xx.cs.illinois.edu and IP address + hostname := fmt.Sprintf("fa23-cs425-02%s.cs.illinois.edu", vm) + ip := getIP(hostname) + + // Step 2: connect to the VM using UDP and send GET request to the clients on those VMs + requestMessage := fmt.Sprintf("GET %s %s", sdfsFileName, localFileName) + result := c.sendRequest(requestMessage, ip) + + // Send the result back to the main goroutine via the channel + resultChannel <- result + }(vm) + } + + // Collect results from the goroutines + for range VMs { + result := <-resultChannel + // You can process or log the result here + log.Printf("CLIENT: MULTIREAD Result: %s\n", result) + } +} + +func (c *Client) Multiwrite(localFileName, sdfsFileName string, VMs []string) { + // VMs are a list of VM numbers such as 01 ... 10 + resultChannel := make(chan string, len(VMs)) // Create a channel to collect results + + for _, vm := range VMs { + go func(vm string) { + // Step 1: get the VM hostname fa23-cs425-02xx.cs.illinois.edu and IP address + hostname := fmt.Sprintf("fa23-cs425-02%s.cs.illinois.edu", vm) + ip := getIP(hostname) + + // Step 2: connect to the VM and send PUT request to the clients on those VMs + requestMessage := fmt.Sprintf("PUT %s %s", localFileName, sdfsFileName) + result := c.sendRequest(requestMessage, ip) + + // Send the result back to the main goroutine via the channel + resultChannel <- result + }(vm) + } + + // Collect results from the goroutines + for range VMs { + result := <-resultChannel + // You can process or log the result here + log.Printf("CLIENT: MULTIWRITE Result: %s\n", result) + } +} + +func (c *Client) handleGet(conn net.Conn, sdfsFileName, localFileName string) { + log.Printf("CLIENT: Handle Multireader: %s -> %s \n", sdfsFileName, localFileName) + defer conn.Close() + var message string + err := c.Get(sdfsFileName, localFileName) + if err != nil { + log.Printf("CLIENT: MULTIREAD: GET %s %s failed: %v", sdfsFileName, localFileName, err) + message = "FAILED" + } else { + message = "SUCCESS" + } + _, err = fmt.Fprint(conn, message) + if err != nil { + log.Printf("CLIENT: Fail to send back multiread request: %v \n", err) + } +} + +func (c *Client) handlePUT(conn net.Conn, localFileName, sdfsFileName string) { + defer conn.Close() + var message string + err := c.Put(localFileName, sdfsFileName) + if err != nil { + log.Printf("CLIENT: MULTIWRITE: PUT %s %s failed: %v", localFileName, sdfsFileName, err) + message = "FAILED" + } else { + message = "SUCCESS" + } + _, err = fmt.Fprint(conn, message) + if err != nil { + log.Printf("CLIENT: Fail to send back MULTIWRITE request: %v \n", err) + } +} + +func (c *Client) Put(localFileName, sdfsFileName string) error { + // Step 1: Connect to the NameNode using TCP + nameNodeConn, err := net.Dial("tcp", c.NamenodeAddr) + if err != nil { + log.Println("CLIENT: PUT : Fail to connect to namenode", err) + return err + } + defer nameNodeConn.Close() + + // Step 2: Send a PUT request to the NameNode + messageID := time.Now().Unix() + err = c.writeMessage(nameNodeConn, PUT, sdfsFileName, messageID) + if err != nil { + log.Println("CLIENT: PUT : Fail to send PUT request to namenode", err) + return err + } + // step 3: get a list of datanode from the server + message, err := c.readMessage(nameNodeConn) + if err != nil { + log.Println("CLIENT: PUT : fail to read message from server", err) + return err + } + assignment, err := payloadToAssignment(message.Payload) + if err != nil { + log.Println("CLIENT: PUT : Fail to parse PUT payload to assignment", err) + return err + } + + // Step 4: For each datanode, upload file to the datanode + sucDatanode := []string{} + for _, datanode := range assignment.Datanodes { + remote := datanode + ":cs425mp3/sdfs/" + assignment.Filename + "-" + strconv.Itoa(assignment.Version) + // cmd := exec.Command("scp", localFileName, remote) + cmd := exec.Command("scp", "-l", strconv.Itoa(Limit), localFileName, remote) + cmd.Stdout = os.Stderr + cmd.Stderr = os.Stderr + + if err := cmd.Run(); err != nil { + log.Printf("CLIENT: PUT : Fail to put data from datanode %s: %v\n", datanode, err) + continue + } + sucDatanode = append(sucDatanode, datanode) + } + log.Printf("CLIENT: PUT : Successfully put data from datanode\n") + // Step 5: acknowledge the server + assignment.Datanodes = sucDatanode + err = c.writeMessage(nameNodeConn, PUTCONFIRM, assignment, messageID) + if err != nil { + log.Println("CLIENT: PUT : Fail to send PUTCONFIRM request to namenode", err) + return err + } + // Step 6: Receive server's PUTCONFIRM message + message, err = c.readMessage(nameNodeConn) + if err != nil { + log.Println("CLIENT: PUT : Fail to parse PUTCONFIRM payload to assignment", err) + return err + } + + if len(sucDatanode) > 0 && message.Type == PUTCONFIRM { + return nil + } else { + return fmt.Errorf("failed to PUT from datanodes") + } +} + +func (c *Client) Delete(sdfsFileName string) error { + // Step 1: Connect to the NameNode using TCP + nameNodeConn, err := net.Dial("tcp", c.NamenodeAddr) + if err != nil { + log.Println("CLIENT: PUT : Fail to connect to namenode", err) + return err + } + defer nameNodeConn.Close() + + // Step 2: Send a DELETE request to the NameNode + messageID := time.Now().Unix() + err = c.writeMessage(nameNodeConn, DELETE, sdfsFileName, messageID) + if err != nil { + log.Println("CLIENT: DELETE : Fail to send DELETE request to namenode", err) + return err + } + // step 3: get a list of datanode from the server + message, err := c.readMessage(nameNodeConn) + if err != nil { + log.Println("CLIENT: DELETE : fail to read message from server", err) + return err + } + assignment, err := payloadToAssignment(message.Payload) + if err != nil { + log.Println("CLIENT: DELETE : Fail to parse DELETE payload to assignment", err) + return err + } + + // Step 4: For each datanode, DELETE file to the datanode + sucDatanode := []string{} + for _, datanode := range assignment.Datanodes { + remote := "cs425mp3/sdfs/" + assignment.Filename + "-*" + cmd := exec.Command("ssh", datanode, "rm "+remote) + cmd.Stdout = os.Stderr + cmd.Stderr = os.Stderr + + if err := cmd.Run(); err != nil { + log.Printf("CLIENT: DELETE : Fail to delete data from datanode %s: %v\n", datanode, err) + continue + } + sucDatanode = append(sucDatanode, datanode) + } + log.Printf("CLIENT: DELETE : Successfully delete data from datanode\n") + // Step 5: acknowledge the server + assignment.Datanodes = sucDatanode + err = c.writeMessage(nameNodeConn, DELCONFIRM, assignment, messageID) + if err != nil { + log.Println("CLIENT: DELETE : Fail to send DELCONFIRM request to namenode", err) + return err + } + // Step 6: Receive server's PUTCONFIRM message + message, err = c.readMessage(nameNodeConn) + if err != nil { + log.Println("CLIENT: DELETE : Fail to parse DELCONFIRM payload to assignment", err) + return err + } + + if len(sucDatanode) > 0 && message.Type == DELCONFIRM { + return nil + } else { + return fmt.Errorf("failed to DELETE from datanodes") + } +} + +func (c *Client) ListReplica(sdfsFileName string) { + // Step 1: Connect to the NameNode using TCP + nameNodeConn, err := net.Dial("tcp", c.NamenodeAddr) + if err != nil { + log.Println("CLIENT: LISTREPLICA :Fail to connect to namenode", err) + return + } + defer nameNodeConn.Close() + + // Step 2: Send a PUT request to the NameNode + messageID := time.Now().Unix() + err = c.writeMessage(nameNodeConn, LISTREPLICA, sdfsFileName, messageID) + if err != nil { + log.Println("CLIENT: LISTREPLICA :Fail to send LISTREPLICA request to namenode", err) + return + } + // step 3: get a list of datanode from the server + message, err := c.readMessage(nameNodeConn) + if err != nil { + log.Println("CLIENT: LISTREPLICA : fail to read message from server", err) + return + } + assignment, err := payloadToAssignment(message.Payload) + if err != nil { + log.Println("CLIENT: LISTREPLICA : Fail to parse LISTREPLICA payload to assignment", err) + return + } + + // step 4: print the assignment + for _, datanode := range assignment.Datanodes { + fmt.Print("IP:", datanode, "Hostname:") + hostnames, err := net.LookupAddr(datanode) + if err != nil { + log.Printf("CLIENT: LISTREPLICA : Look up address Error: %v\n", err) + return + } + + fmt.Println(hostnames[0]) + } +} + +func (c *Client) writeMessage(conn net.Conn, msgType MsgType, payload interface{}, id int64) error { + message := Message{ + ID: id, + Sender: c.ID, + Type: msgType, + Payload: payload, + } + request, err := json.Marshal(message) + if err != nil { + return err + } + conn.Write([]byte(request)) + return nil +} + +func (c *Client) readMessage(conn net.Conn) (Message, error) { + var message Message + buffer := make([]byte, 1024) + numBytes, err := conn.Read(buffer) + if err != nil { + return message, err + } + data := buffer[:numBytes] + // Decode the incoming data into a Message struct + err = json.Unmarshal(data, &message) + if err != nil { + return message, err + } + return message, nil +} + +func (c *Client) readString(conn net.Conn) (string, error) { + var receivedString string + buffer := make([]byte, 1024) + numBytes, err := conn.Read(buffer) + if err != nil { + return receivedString, err + } + receivedString = string(buffer[:numBytes]) + return receivedString, nil +} + +func (c *Client) sendRequest(requestMessage, ip string) string { + var message string + addr := ip + ":" + c.Port + conn, err := net.Dial("tcp", addr) + if err != nil { + log.Printf("CLIENT: Fail to connect to ip %s: %v\n", ip, err) + return message + } + defer conn.Close() + + _, err = fmt.Fprint(conn, requestMessage) + if err != nil { + log.Printf("CLIENT: Fail to send request to ip %s: %v\n", ip, err) + return message + } + + message, err = c.readString(conn) + if err != nil { + log.Printf("CLIENT: Fail to read message from ip %s: %v\n", ip, err) + return message + } + return message +} + +func parseIP(id string) string { + components := strings.Split(id, ":") + return components[1] +} + +func getIP(hostname string) string { + ips, err := net.LookupIP(hostname) + if err != nil { + log.Println("CLIENT: Fail to resolve hostname:", err) + os.Exit(1) + } + return ips[0].String() +} diff --git a/sdfs/datanode.go b/sdfs/datanode.go new file mode 100644 index 0000000..947f27d --- /dev/null +++ b/sdfs/datanode.go @@ -0,0 +1,168 @@ +/* Datanode.go +Data: {filename, version} sdfs/localfile_version +Func: + Handle put/get/delete +*/ + +package sdfs + +import ( + "encoding/json" + "log" + "net" + "os" + "strconv" + "strings" + "time" +) + +type Datanode struct { + IP string + Port string + Files map[string]int // filename, latest version +} + +func InitDatanode(ip string) *Datanode { + d := new(Datanode) + d.IP = ip + d.Port = "5005" + d.Files = make(map[string]int) + return d +} + +func (n *Datanode) Run() { + //TODO + go n.Listener() +} + +func (n *Datanode) Listener() { + // Create a TCP listener for Datanodes. + address := "0.0.0.0" + ":" + n.Port + listener, err := net.Listen("tcp", address) + if err != nil { + log.Fatalf("Datanode: Failed to start Datanode listener: %v", err) + } + defer listener.Close() + + log.Println("Datanode: datanode is listening...") + + // Listen for incoming messages via network communication + for { + // Accept incoming connections + conn, err := listener.Accept() + if err != nil { + log.Println("Datanode: Error accepting connection:", err) + continue + } + + receivedMessage, err := n.readMessage(conn) + if err != nil { + log.Printf("Datanode: Error received message: %v\n", err) + continue + } + log.Printf("Datanode: Received message %+v\n", receivedMessage) + + go n.handleMessage(receivedMessage, conn) + } +} + +func (dn *Datanode) handleMessage(receivedMessage Message, conn net.Conn) { + // Listen for Datanode requests and process them (e.g., put/get/delete operations) + sender := receivedMessage.Sender + messageType := receivedMessage.Type + + switch messageType { + case RECOVER: + dn.HandleRecover(conn, sender) + default: + log.Println("Datanode: wrong message type") + } +} + +func (n *Datanode) HandleRecover(conn net.Conn, sender string) { + defer conn.Close() + // Step 1: find the latest version for each file in /sdfs + replicaMap, err := generateReplicaMap("/sdfs") + if err != nil { + log.Println("Datanode: Failed to find replicas for files in /sdfs", err) + } + // Step 2: write back the latest version + n.writeMessage(conn, RECOVER, replicaMap, time.Now().Unix()) +} + +/* Helper */ +func (c *Datanode) writeMessage(conn net.Conn, msgType MsgType, payload interface{}, id int64) error { + message := Message{ + ID: id, + Sender: c.IP, + Type: msgType, + Payload: payload, + } + request, err := json.Marshal(message) + if err != nil { + return err + } + conn.Write([]byte(request)) + return nil +} + +func (c *Datanode) readMessage(conn net.Conn) (Message, error) { + var message Message + buffer := make([]byte, 1024) + numBytes, err := conn.Read(buffer) + if err != nil { + return message, err + } + data := buffer[:numBytes] + // Decode the incoming data into a Message struct + err = json.Unmarshal(data, &message) + if err != nil { + return message, err + } + return message, nil +} + +func generateReplicaMap(directoryPath string) (map[string]Replica, error) { + replicaMap := make(map[string]Replica) + + files, err := os.ReadDir(directoryPath) + if err != nil { + return nil, err + } + + for _, file := range files { + if !file.IsDir() { + filename := file.Name() + parts := strings.Split(filename, "-") + if len(parts) != 2 { + continue // Skip files with invalid filenames + } + + address := parts[0] + versionStr := parts[1] + version, err := strconv.Atoi(versionStr) + if err != nil { + continue // Skip files with invalid version numbers + } + + // Assuming you already have a replicaMap with existing entries + if existingReplica, exists := replicaMap[filename]; exists { + if version > existingReplica.Version { + // Update the version only if it's greater + replicaMap[filename] = Replica{ + Address: address, + Version: version, + } + } + } else { + // If the filename doesn't exist in the map, create a new entry + replicaMap[filename] = Replica{ + Address: address, + Version: version, + } + } + } + } + + return replicaMap, nil +} diff --git a/sdfs/go.mod b/sdfs/go.mod new file mode 100644 index 0000000..fbc288c --- /dev/null +++ b/sdfs/go.mod @@ -0,0 +1,3 @@ +module cs425mp4/sdfs + +go 1.21.0 diff --git a/sdfs/namenode.go b/sdfs/namenode.go new file mode 100644 index 0000000..57b4826 --- /dev/null +++ b/sdfs/namenode.go @@ -0,0 +1,966 @@ +/* namenode.go +Data: {filename, {datanode address, version }} +Functions: + handle put/get/delete + Replication + Recover + Token tokenControl.go +*/ + +package sdfs + +import ( + "encoding/json" + "fmt" + "log" + "math/rand" + "net" + "os" + "os/exec" + "strconv" + "strings" + "sync" + "time" +) + +type Replica struct { + Address string + Version int +} + +type ReplicaSlice []Replica + +type Namenode struct { + IP string + ReplicationFactor int + MetaData map[string][]Replica + FileToken map[string]*File + MemberList []string + mu sync.Mutex +} + +const logFlag = "Namenode: " + +const MembershipCheckInterval = 1 * time.Second + +func InitNamenode() *Namenode { + // memberList, err := getMeberList("membership.json") + memberList := []string{} + // if err != nil { + // log.Println(logFlag, "failed to read membership.json") + // return nil + // } + IP, err := GetLocalIPAddress() + if err != nil { + log.Println(logFlag, "failed to get local ip address") + return nil + } + nameNode := Namenode{ + IP: IP, + ReplicationFactor: 4, + FileToken: make(map[string]*File), + MetaData: make(map[string][]Replica), + MemberList: memberList, + mu: sync.Mutex{}, + } + nameNode.Recover() + // replicate by using metadata + nameNode.Replication() + + return &nameNode +} + +func (n *Namenode) Run() { + /* + firstly replicate by using metadata since we need to figure out which replicas dropped after election + listening message from client to handle put/get/delete + re-raplication detector (read membership list file to find failure, then replicate) + */ + // Start a goroutine for listening messages. + go n.listenClientMessage() + + // Start a goroutine for the replication detector. + go n.replicationDetector() + +} + +func (n *Namenode) replicationDetector() { + // check the membership file periodically if detect datanode fails, call relication() and update current alive data nodes list + ticker := time.NewTicker(MembershipCheckInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + newMemberList, err := getMeberList("membership.json") + if err != nil { + log.Println(logFlag, "failed to read membership.json") + return + } + n.mu.Lock() + replicas := n.MemberList + log.Printf("Namenode: replicas: %v and newMemberList: %v", replicas, newMemberList) + // find joined data nodes to update metadata + joinedMembers := difference(replicas, newMemberList) + + // find failed data nodes and get its stored information(files and versions) + failedMembers := difference(newMemberList, replicas) + n.mu.Unlock() + if len(joinedMembers) > 0 { + log.Printf("Namenode: replicationDetector: there are some new nodes %v", joinedMembers) + for _, joinedMember := range joinedMembers { + n.MemberList = append(replicas, joinedMember) + } + } + if len(failedMembers) > 0 { + log.Printf("Namenode: replicationDetector: there are some failed nodes %v", failedMembers) + n.removeFailedNodes(failedMembers) + n.clearMetadataForFailedNodes(failedMembers) + exectime := MeasureExecutionTime(func() { + n.mu.Lock() + n.Replication() + n.mu.Unlock() + }) + log.Println("Namenode: MeasureExecutionTime REPLICATION : ", exectime) + } else { + // do replication + n.mu.Lock() + n.Replication() + n.mu.Unlock() + } + + } + } +} + +func (n *Namenode) listenClientMessage() { + // Create a TCP listener for clients. + address := "0.0.0.0" + ":" + "5002" + listener, err := net.Listen("tcp", address) + if err != nil { + log.Fatalf("Failed to start client listener: %v", err) + } + defer listener.Close() + log.Println(logFlag, "name node is listening client...") + // Buffer to store incoming data + buffer := make([]byte, 1024) + + // Listen for incoming messages via network communication + for { + // Accept incoming connections + conn, err := listener.Accept() + if err != nil { + // fmt.Println("Error accepting connection:", err) + continue + } + numBytes, err := conn.Read(buffer) + if err != nil { + log.Printf("Namenode: Error reading from connection: %v\n", err) + continue + } + + data := buffer[:numBytes] + + // Decode the incoming data into a Message struct + var receivedMessage Message + err = json.Unmarshal(data, &receivedMessage) + if err != nil { + log.Printf("Namenode: Error decoding incoming data: %v\n", err) + continue + } + + log.Printf("Namenode: Received message %+v\n", receivedMessage) + + go n.handleMessage(receivedMessage, conn) + } +} + +func (nn *Namenode) handleMessage(receivedMessage Message, conn net.Conn) { + // Listen for client requests and process them (e.g., put/get/delete operations) + sender := receivedMessage.Sender + messageType := receivedMessage.Type + msg := receivedMessage.Payload + + switch messageType { + case PUT: + comp := strings.Fields(msg.(string)) + sdfsfilename := comp[0] + id := receivedMessage.ID + nn.HandlePut(sdfsfilename, id, sender, conn) + case GET: + comp := strings.Fields(msg.(string)) + sdfsfilename := comp[0] + id := receivedMessage.ID + nn.HandleGet(sdfsfilename, id, sender, conn) + case DELETE: + comp := strings.Fields(msg.(string)) + sdfsfilename := comp[0] + id := receivedMessage.ID + nn.HandleDelete(sdfsfilename, id, sender, conn) + case LISTREPLICA: + comp := strings.Fields(msg.(string)) + sdfsfilename := comp[0] + id := receivedMessage.ID + nn.HandleLS(sdfsfilename, id, sender, conn) + default: + log.Println("namenode: wrong message type") + } +} + +func findLatestVersions(replicas []Replica) (int, []string) { + latestVersion := -1 // Initialize with a lower version number + var latestAddresses []string + + // Iterate through the list of replicas + for _, replica := range replicas { + if replica.Version > latestVersion { + // Found a new latest version + latestVersion = replica.Version + latestAddresses = []string{replica.Address} + } else if replica.Version == latestVersion { + // Found another replica with the same latest version + latestAddresses = append(latestAddresses, replica.Address) + } + } + + return latestVersion, latestAddresses +} + +func (n *Namenode) HandlePut(sdfsfilename string, id int64, sender string, conn net.Conn) { + /* + 1. use scheduler to get ReplicationFactor distinct alive data nodes' hostnames + 2. use sdfs filename to get write token + if new file, update fileToken firstly + if update, also send delete message + 3. send the sdfs filename and hostnames back client + 4. keep listening client's comfirmation message, update metadata, delete older versions in datanodes, use sdfs filename to release token + 5. send confirmation back + */ + // 1. use sdfs filename to get write token + file, ok := n.FileToken[sdfsfilename] + if !ok { + n.FileToken[sdfsfilename] = GetFile(sdfsfilename) + n.FileToken[sdfsfilename].GetWriterToken() + } else { + file.GetWriterToken() + } + defer n.FileToken[sdfsfilename].ReleaseWriterToken() + var payload Assignment + payload.Filename = sdfsfilename + _, ok = n.MetaData[sdfsfilename] + if !ok { + // put + log.Printf("Namenode: put %s to SDFS\n", sdfsfilename) + // 2. use scheduler to get ReplicationFactor distinct alive data nodes' hostnames + datanodes := getDistDatanode(n.ReplicationFactor, []string{n.IP}) + payload.Version = 1 + payload.Datanodes = datanodes + log.Printf("Namenode: put to %v\n", payload.Datanodes) + } else { + // update + log.Printf("Namenode: update %s to SDFS\n", sdfsfilename) + + n.mu.Lock() + replicas := n.MetaData[sdfsfilename] + n.mu.Unlock() + latestVersion, latestAddresses := findLatestVersions(replicas) + payload.Version = latestVersion + 1 + payload.Datanodes = latestAddresses + log.Printf("Namenode: update to %v\n", payload.Datanodes) + // log.Println("Namenode: should also delete older versions here") + } + + // 3. send the sdfs filename and hostnames back client + err := n.writeMessage(conn, PUT, payload, id) + if err != nil { + log.Println(logFlag, "Failed to send PUT to client") + return + } else { + log.Printf("Namenode: Success to send PUT to client with payload: %+v\n", payload) + } + + // 4. keep listening client's comfirmation message, use sdfs filename to release token + buffer := make([]byte, 1024) + numBytes, err := conn.Read(buffer) + if err != nil { + log.Printf("Namenode: Error reading PUTCINFIRMATION Message from connection when: %v\n", err) + return + } + data := buffer[:numBytes] + // Decode the incoming data into a Message struct + var receivedMessage Message + err = json.Unmarshal(data, &receivedMessage) + if err != nil { + log.Printf("Namenode: Error decoding PUTCINFIRMATION Message data: %v\n", err) + return + } + log.Printf("Namenode: Received message %+v\n", receivedMessage) + assignment, err := payloadToAssignment(receivedMessage.Payload) + if err != nil { + // Payload is not an Assignment + log.Println("Namenode: Payload is not an Assignment") + return + } + afilename := assignment.Filename + aversion := assignment.Version + adatanodes := assignment.Datanodes + + n.mu.Lock() + // delete older version + // update version if update + for i := range n.MetaData[afilename] { + if n.MetaData[afilename][i].Version < aversion { + deleteFileV(n.MetaData[afilename][i].Address, afilename, n.MetaData[afilename][i].Version) + n.MetaData[afilename][i].Version += 1 + } + } + // update metadata(delete old replicas and add new replica) if put + if aversion == 1 { + // delete(n.MetaData, afilename) + for _, adatanode := range adatanodes { + newReplica := Replica{ + Address: adatanode, + Version: aversion, // Set the version number as needed + } + n.MetaData[afilename] = append(n.MetaData[afilename], newReplica) + } + } + n.mu.Unlock() + + // 5. send confirmation back + newPayload := Assignment{ + Filename: sdfsfilename, + Version: 0, + Datanodes: []string{}, + } + err = n.writeMessage(conn, PUTCONFIRM, newPayload, id) + if err != nil { + log.Println(logFlag, "Failed to send PUTCONFIRM to client") + } else { + log.Printf("Namenode: Success to send PUTCONFIRM to client with payload: %+v\n", payload) + } +} + +func (n *Namenode) HandleLS(sdfsfilename string, id int64, sender string, conn net.Conn) { + /* + 1. check if file exists + 2. get the list of nodes who has the file + 3. send the sdfs filename and hostnames back client + */ + // 1. check if file exists + _, ok := n.MetaData[sdfsfilename] + if !ok { + // The key (sdfsfilename) does not exist in the map + fmt.Printf("Namenode: %s doesn't exist in SDFS\n", sdfsfilename) + payload := Assignment{ + Filename: sdfsfilename, + Version: 0, + Datanodes: []string{}, + } + err := n.writeMessage(conn, LISTREPLICA, payload, id) + if err != nil { + log.Println(logFlag, "Failed to send LISTREPLICA to client") + } else { + log.Printf("Namenode: Success to send LISTREPLICA to client with payload: %+v\n", payload) + } + return + } + // 2. get the list of nodes who has the file + n.mu.Lock() + replicas := n.MetaData[sdfsfilename] + n.mu.Unlock() + latestVersion, latestAddresses := findLatestVersions(replicas) + log.Printf("Namenode: GET %s with version %d in %v\n", sdfsfilename, latestVersion, latestAddresses) + // 3. send the sdfs filename and hostnames back client + payload := Assignment{ + Filename: sdfsfilename, + Version: latestVersion, + Datanodes: latestAddresses, + } + err := n.writeMessage(conn, LISTREPLICA, payload, id) + if err != nil { + log.Println(logFlag, "Failed to send LISTREPLICA to client") + return + } else { + log.Printf("Namenode: Success to send LISTREPLICA to client with payload: %+v\n", payload) + } +} + +func (n *Namenode) HandleGet(sdfsfilename string, id int64, sender string, conn net.Conn) { + /* + 1. use sdfs filename to get read token + 2. get the list of nodes who has the file + 3. send the sdfs filename and hostnames back client + 4. keeping listening GETCONFIRM from client(why) + */ + // 1. use sdfs filename to get read token + file, ok := n.FileToken[sdfsfilename] + if !ok { + // The key (sdfsfilename) does not exist in the map + fmt.Printf("%s doesn't exist in SDFS", sdfsfilename) + payload := Assignment{ + Filename: sdfsfilename, + Version: 0, + Datanodes: []string{}, + } + err := n.writeMessage(conn, GET, payload, id) + if err != nil { + log.Println(logFlag, "Failed to send GET to client") + } else { + log.Printf("Namenode: Success to send GET to client with payload: %+v\n", payload) + } + return + } + file.GetReaderToken() + defer file.ReleaseReaderToken() + // 2. get the list of nodes who has the file + n.mu.Lock() + replicas := n.MetaData[sdfsfilename] + n.mu.Unlock() + latestVersion, latestAddresses := findLatestVersions(replicas) + log.Printf("Namenode: GET %s with version %d in %v\n", sdfsfilename, latestVersion, latestAddresses) + // 3. send the sdfs filename and hostnames back client + payload := Assignment{ + Filename: sdfsfilename, + Version: latestVersion, + Datanodes: latestAddresses, + } + err := n.writeMessage(conn, GET, payload, id) + if err != nil { + log.Println(logFlag, "Failed to send GET to client") + return + } else { + log.Printf("Namenode: Success to send GET to client with payload: %+v\n", payload) + } + // 4. keeping listening GETCONFIRM from client + buffer := make([]byte, 1024) + numBytes, err := conn.Read(buffer) + if err != nil { + log.Printf("Namenode: Error reading GETCINFIRMATION Message from connection when: %v\n", err) + return + } + data := buffer[:numBytes] + // Decode the incoming data into a Message struct + var receivedMessage Message + err = json.Unmarshal(data, &receivedMessage) + if err != nil { + log.Printf("Namenode: Error decoding GETCINFIRMATION Message data: %v\n", err) + return + } + log.Printf("Namenode: Received message %+v\n", receivedMessage) +} + +func (n *Namenode) HandleDelete(sdfsfilename string, id int64, sender string, conn net.Conn) { + /* + 1. use sdfs filename to get write token + 2. update metadata and get the list of nodes who has the file + 3. send the sdfs filename and hostnames back client + 4. keep listening client's comfirmation message, use sdfs filename to release token + 5. send DELETECONFIRMATION back + */ + file, ok := n.FileToken[sdfsfilename] + if ok { + // there exsit a file that should be deleted + // 1. use sdfs filename to get write token + file.GetWriterToken() + defer file.ReleaseWriterToken() + // 2. update metadata and get the list of nodes who has the file + datanodes := n.differenceMetaData(sdfsfilename) + // 3. send the sdfs filename and hostnames back client + payload := Assignment{ + Filename: sdfsfilename, + Version: 0, + Datanodes: datanodes, + } + err := n.writeMessage(conn, DELETE, payload, id) + if err != nil { + log.Println(logFlag, "Failed to send DELETE to client") + } else { + log.Printf("Namenode: Success to send DELETE to client with payload: %+v\n", payload) + } + // 4. keep listening client's comfirmation message, use sdfs filename to release token + buffer := make([]byte, 1024) + numBytes, err := conn.Read(buffer) + if err != nil { + log.Printf("Namenode: Error reading DELCONFIRM Message from connection when: %v\n", err) + return + } + data := buffer[:numBytes] + // Decode the incoming data into a Message struct + var receivedMessage Message + err = json.Unmarshal(data, &receivedMessage) + if err != nil { + log.Printf("Namenode: Error decoding DELCONFIRM Message data: %v\n", err) + return + } + log.Printf("Namenode: Received message %+v\n", receivedMessage) + // 5. send DELETECONFIRMATION back + newPayload := Assignment{ + Filename: sdfsfilename, + Version: 0, + Datanodes: []string{}, + } + err = n.writeMessage(conn, DELCONFIRM, newPayload, id) + if err != nil { + log.Println(logFlag, "Failed to send DELCONFIRM to client") + } else { + log.Printf("Namenode: Success to send DELCONFIRM to client with payload: %+v\n", payload) + } + } +} + +func (n *Namenode) Replication() { + // Iterate over each file in metadata + for filename, replicas := range n.MetaData { + if len(replicas) == 0 { + log.Printf("Namenode: No replicas found for file '%s'\n", filename) + continue + } + /* + find the latest version, to check if the number of latest versions equal to factor. get the number of replicas and get list of datanodes that doesn't store this file + get the address of datanodes who currently store this file, send to datanode and let it do replication and wait for confirmation(do not need to get write token) + if receive confirmation(filename), update metadata and check if there are older versions of the filename in metadata. If so, send DELETE message to data nodes who hold the older version. + if not receive confirmation, just pass + wait for DELETE confirmation + */ + latestVersion, latestAddresses := findLatestVersions(replicas) + log.Printf("Namenode: %s, and its latest version is %d, latestAddresses are %v", filename, latestVersion, latestAddresses) + if len(latestAddresses) < n.ReplicationFactor { + numReplicasNeedtoBeReplicated := n.ReplicationFactor - len(latestAddresses) + log.Printf("Namenode: %s need to be replicated %d copies", filename, numReplicasNeedtoBeReplicated) + var addresses []string + addresses = append(latestAddresses, n.IP) + datanodes := getDistDatanode(numReplicasNeedtoBeReplicated, addresses) + log.Printf("Namenode: %s be assigned to %v", filename, datanodes) + // Send to datanode who has the file to ask it do replication until receive confirmation + masterDatanode := latestAddresses[1] + log.Printf("Namenode: %s send to master datanode %s", filename, masterDatanode) + // Acquire a write token + file := n.FileToken[filename] + file.GetWriterToken() + for _, datanode := range datanodes { + remote1 := masterDatanode + ":cs425mp3/sdfs/" + filename + "-" + strconv.Itoa(latestVersion) + remote2 := datanode + ":cs425mp3/sdfs/" + filename + "-" + strconv.Itoa(latestVersion) + //cmd := exec.Command("scp", remote1, remote2) + + cmd := exec.Command("scp", "-l", strconv.Itoa(Limit), remote1, remote2) + cmd.Stdout = os.Stderr + cmd.Stderr = os.Stderr + + if err := cmd.Run(); err != nil { + log.Printf("Namenode: Replication : Fail to replicate data from datanode %s to %s: %v\n", masterDatanode, datanode, err) + continue + } + newReplica := Replica{ + Address: datanode, + Version: latestVersion, + } + n.MetaData[filename] = append(n.MetaData[filename], newReplica) + log.Printf("Namenode: Replication : Success to replicate data from datanode %s to %s", masterDatanode, datanode) + } + log.Printf("Namenode: Replication: replicate suceessfuly") + file.ReleaseWriterToken() + } else if len(latestAddresses) > n.ReplicationFactor { + // there are duplicate replicas need to be deleted + // get duplicate datanodes addresses + // delete duplicate datanodes(get write token) + + numDeleted := len(latestAddresses) - n.ReplicationFactor + log.Printf("Namenode: %s has %d duplicate replicas", filename, numDeleted) + datanodes := randomSelection(latestAddresses, numDeleted) + log.Printf("Namenode: %s assigned %v to delete", filename, datanodes) + // Acquire a write token + file := n.FileToken[filename] + file.GetWriterToken() + for _, datanode := range datanodes { + deleteFileV(datanode, filename, latestVersion) + log.Printf("Namenode: Replication: delete %s from %s", filename, datanode) + } + log.Printf("Namenode: Replication: delete duplicate suceessfuly") + file.ReleaseWriterToken() + } + } +} + +func (n *Namenode) Recover() { + // update fileToken + // update Metadata + // get alive members' IP addresses + datanodeAddrs, err := getMeberList("membership.json") + log.Printf("Namenode: number of members: %d\n", len(datanodeAddrs)) + if len(datanodeAddrs) == 10 { + log.Printf("first leader, don't need Recover") + return + } + if err != nil { + log.Println(logFlag, "failed to read membership.json") + return + } + // find the local IP + localIP, err := GetLocalIPAddress() + log.Printf("Namenode: local IP is %s\n", localIP) + if err != nil { + log.Println(logFlag, "Error:", err) + return + } + nonLocalDatanodeAddrs := []string{} + for i := 0; i < len(datanodeAddrs); i++ { + // remove name node self's IP address + if datanodeAddrs[i] == localIP { + log.Printf("Namenode: don't send to local IP %s\n", localIP) + continue + } + datanodeAddr := fmt.Sprintf("%s:%s", datanodeAddrs[i], "5005") + log.Println(logFlag, "server no.", i, "'s server address: ", datanodeAddr) + nonLocalDatanodeAddrs = append(nonLocalDatanodeAddrs, datanodeAddr) + } + log.Printf("nonLocalDatanodeAddrs addresses are %v\n", nonLocalDatanodeAddrs) + // Create a WaitGroup to wait for all goroutines to finish + var wg sync.WaitGroup + // send "RECOVER" message to all data nodes + for _, vmAddress := range nonLocalDatanodeAddrs { + // Increment the WaitGroup counter for each goroutine + wg.Add(1) + go func(address string) { + // Decrement the counter when the goroutine completes + defer wg.Done() + message := Message{ + Sender: n.IP, + Type: RECOVER, + Payload: nil, + } + messageJSON, err := json.Marshal(message) + if err != nil { + log.Println(logFlag, "Failed to serialize MembershipList to JSON:", err) + return + } + // Create a TCP connection to the VM. + conn, err := net.Dial("tcp", address) + if err != nil { + log.Printf("Namenode: Failed to connect to VM at %s: %v\n", address, err) + return + } + defer conn.Close() + + // Send a recovery request to the VM. + _, err = conn.Write([]byte(messageJSON)) + if err != nil { + log.Printf("Namenode: Failed to send recovery request to VM at %s: %v\n", address, err) + return + } + + // Receive and process the metadata from the VM (replace this logic with your protocol). + buffer := make([]byte, 1024) + num, err := conn.Read(buffer) + if err != nil { + log.Printf("Namenode: Failed to read metadata from VM at %s: %v\n", address, err) + return + } + + metadata := string(buffer[:num]) + // Parse the received metadata into a map[string]int. + var receivedMetadata Message + err = json.Unmarshal([]byte(metadata), &receivedMetadata) + if err != nil { + log.Printf("Namenode: Failed to parse received metadata: %v\n", err) + return + } + if receivedMetadata.Type != RECCONFIRM { + log.Println(logFlag, "ERROR: not confirmation for recover") + return + } + //update the Namenode's metadata and file token using the received data. + n.combineMataData(receivedMetadata) + n.updataFileToken(receivedMetadata) + }(vmAddress) + } + // Wait for all goroutines to finish + wg.Wait() +} + +func (n *Namenode) updataFileToken(receivedMetadata Message) { + msg := receivedMetadata.Payload + metadata, ok := msg.(map[string][]Replica) + if !ok { + log.Println(logFlag, "Invalid metadata format.") + return + } + n.mu.Lock() + defer n.mu.Unlock() + // Update the Namenode's file tokens based on the received data. + for filename, _ := range metadata { + // Check if the filename already exists in the metadata. + if _, exists := n.FileToken[filename]; exists { + log.Printf("Namenode: %s already get tokens\n", filename) + } else { + // If the filename doesn't exist in metadata, assign tokens to this file. + n.FileToken[filename] = GetFile(filename) + } + } + + log.Println(logFlag, "Updated FileToken:") + for filename, File := range n.FileToken { + log.Printf("Namenode: Filename: %s\n", filename) + log.Printf("Namenode: File: %p\n", File) + } +} + +func (n *Namenode) combineMataData(receivedMetadata Message) { + msg := receivedMetadata.Payload + metadata, ok := msg.(map[string][]Replica) + if !ok { + log.Println(logFlag, "Invalid metadata format.") + return + } + n.mu.Lock() + defer n.mu.Unlock() + // Update the Namenode's metadata based on the received data. + for filename, replicas := range metadata { + // Check if the filename already exists in the metadata. + if existingReplicas, exists := n.MetaData[filename]; exists { + // Append the new Replica to the list of replicas for the filename. + n.MetaData[filename] = append(replicas, existingReplicas...) + } else { + // If the filename doesn't exist in metadata, create a new entry with a slice of Replica and assign tokens to this file. + n.MetaData[filename] = replicas + //n.FileToken[filename] = GetFile(filename) + } + } + + log.Println(logFlag, "Updated Metadata:") + for filename, replicas := range n.MetaData { + log.Printf("Namenode: Filename: %s\n", filename) + for _, replica := range replicas { + log.Printf("Namenode: Address: %s, Version: %d\n", replica.Address, replica.Version) + } + } +} + +func (n *Namenode) differenceMetaData(filename string) []string { + n.mu.Lock() + defer n.mu.Unlock() + + // Check if the filename already exists in the metadata. + if existingReplicas, exists := n.MetaData[filename]; exists { + // get which data nodes store this file and delete file from metadata and fileToken + addresses := []string{} + for _, replica := range existingReplicas { + addresses = append(addresses, replica.Address) + } + delete(n.MetaData, filename) + delete(n.FileToken, filename) + log.Println(logFlag, "Differenced Metadata:") + for filename, replicas := range n.MetaData { + log.Printf("Namenode: Filename: %s\n", filename) + for _, replica := range replicas { + log.Printf("Namenode: Address: %s, Version: %d\n", replica.Address, replica.Version) + } + } + return addresses + } else { + // If the filename doesn't exist in metadata, return nil which means there is no such file can delete. + return nil + + } +} + +func getMeberList(filePath string) ([]string, error) { + jsonData, err := readJSONFile(filePath) + if err != nil { + return nil, err + } + // Extract the membershiplist as an array of interface{} + membershiplistInterface, ok := jsonData["membershiplist"].([]interface{}) + if !ok { + return nil, fmt.Errorf("Failed to extract membershiplist") + } + + // Convert the interface{} elements to strings + var membershiplist []string + for _, member := range membershiplistInterface { + if s, ok := member.(string); ok { + // Split the input string by colons + parts := strings.Split(s, ":") + + // Extract the part we need and trim any leading/trailing spaces + address := strings.TrimSpace(parts[1]) + membershiplist = append(membershiplist, address) + } + } + + return membershiplist, nil + +} + +func readJSONFile(filePath string) (map[string]interface{}, error) { + // Read the JSON file + data, err := os.ReadFile(filePath) + if err != nil { + return nil, err + } + + // Parse the JSON data into a generic map + var jsonData map[string]interface{} + if err := json.Unmarshal(data, &jsonData); err != nil { + return nil, err + } + return jsonData, nil +} + +func GetLocalIPAddress() (string, error) { + interfaces, err := net.Interfaces() + if err != nil { + return "", err + } + + for _, iface := range interfaces { + addrs, err := iface.Addrs() + if err != nil { + return "", err + } + + for _, addr := range addrs { + ipnet, ok := addr.(*net.IPNet) + if ok && !ipnet.IP.IsLoopback() && ipnet.IP.To4() != nil { + return ipnet.IP.String(), nil + } + } + } + + return "", fmt.Errorf("No suitable IP address found") +} + +// Get the address of a data node that stores this version of the file +func getDistDatanode(numReplicas int, versionAddress []string) []string { + memberList, err := getMeberList("membership.json") + if err != nil { + log.Println(logFlag, "Faill to get distinct datanode") + } + result := difference(versionAddress, memberList) + return randomSelection(result, numReplicas) +} + +// find the elements that exist on B but not exist on A +func difference(A, B []string) []string { + // Create a map to store elements from A + elements := make(map[string]bool) + + // Add elements from A to the map + for _, element := range A { + elements[element] = true + } + + var result []string + + // Iterate through B and check if an element is not in the map + for _, element := range B { + if !elements[element] { + result = append(result, element) + } + } + return result +} + +// Randomly select n elements from a slice. +func randomSelection(slice []string, n int) []string { + length := len(slice) + if n >= length { + return slice + } + + // Shuffle the slice randomly. + rand.Shuffle(length, func(i, j int) { + slice[i], slice[j] = slice[j], slice[i] + }) + + return slice[:n] +} + +func (n *Namenode) removeFailedNodes(failedNodes []string) { + n.mu.Lock() + defer n.mu.Unlock() + log.Printf("Namenode: remove failed nodes\n") + updatedMemberList := make([]string, 0, len(n.MemberList)) + // Create a map to efficiently check for failed nodes + failedNodesMap := make(map[string]bool) + for _, node := range failedNodes { + failedNodesMap[node] = true + } + + for _, node := range n.MemberList { + if !failedNodesMap[node] { + updatedMemberList = append(updatedMemberList, node) + } + } + log.Printf("Namenode: updated memberlist: %v\n", updatedMemberList) + n.MemberList = updatedMemberList +} + +func (n *Namenode) clearMetadataForFailedNodes(failedNodes []string) { + n.mu.Lock() + defer n.mu.Unlock() + log.Printf("Namenode: clear metadata for failed nodes\n") + for filename, replicas := range n.MetaData { + var updatedReplicas []Replica + + for _, replica := range replicas { + if !containsString(failedNodes, replica.Address) { + updatedReplicas = append(updatedReplicas, replica) + } + } + n.MetaData[filename] = updatedReplicas + } +} + +func containsString(slice []string, target string) bool { + for _, item := range slice { + if item == target { + return true + } + } + return false +} + +func (n *Namenode) writeMessage(conn net.Conn, msgType MsgType, payload interface{}, id int64) error { + message := Message{ + ID: id, + Sender: n.IP, + Type: msgType, + Payload: payload, + } + request, err := json.Marshal(message) + if err != nil { + log.Println("CLIENT: Fail to write message", err) + return err + } + conn.Write([]byte(request)) + return nil +} + +func deleteFile(datanode, filename string) { + remote := "cs425mp3/sdfs/" + filename + "-*" + cmd := exec.Command("ssh", datanode, "rm ", remote) + cmd.Stdout = os.Stderr + cmd.Stderr = os.Stderr + + if err := cmd.Run(); err != nil { + log.Printf("CLIENT: DELETE : Fail to delete data from datanode %s: %v\n", datanode, err) + } +} + +func deleteFileV(datanode, filename string, version int) { + remote := "cs425mp3/sdfs/" + filename + "-" + strconv.Itoa(version) + cmd := exec.Command("ssh", datanode, "rm ", remote) + cmd.Stdout = os.Stderr + cmd.Stderr = os.Stderr + + if err := cmd.Run(); err != nil { + log.Printf("CLIENT: DELETE : Fail to delete data from datanode %s: %v\n", datanode, err) + } +} + +func MeasureExecutionTime(fn func()) time.Duration { + startTime := time.Now() + fn() + endTime := time.Now() + return endTime.Sub(startTime) +} diff --git a/sdfs/server.go b/sdfs/server.go new file mode 100644 index 0000000..8fb919a --- /dev/null +++ b/sdfs/server.go @@ -0,0 +1,53 @@ +// server.go +// sdfs node that can join the sdfs, keep track of current leader/leader re-election, switch between datanode and Namenode + +package sdfs + +import ( + "context" + "log" + "strings" +) + +type Server struct { + ID string // hostname : ip address : port : timestamp + Namenode string // id of current Namenode + Role string // Namenode or datanode +} + +func InitializeServer(ID, Namenode string) *Server { + s := new(Server) + s.ID = ID + s.Namenode = Namenode + s.Role = "datanode" + if Namenode == parseHostname(ID) { + s.Role = "Namenode" + } + return s +} + +func (s *Server) SetNamenode(Namenode string) { + s.Namenode = Namenode + if Namenode == parseHostname(s.ID) { + log.Println("Server: set role as Namenode") + s.Role = "Namenode" + } +} + +func (s *Server) Run(ctx context.Context) { + dn := InitDatanode(parseAddress(s.ID)) + nn := InitNamenode() + dn.Run() + nn.Run() +} + +/* Helper functions */ +func parseHostname(id string) string { + components := strings.Split(id, ":") + return components[0] +} + +func parseAddress(id string) string { + components := strings.Split(id, ":") + return components[1] + components[2] +} diff --git a/sdfs/token.go b/sdfs/token.go new file mode 100644 index 0000000..d23844c --- /dev/null +++ b/sdfs/token.go @@ -0,0 +1,80 @@ +package sdfs + +import ( + "sync" +) + +type File struct { + filename string + rwMutex sync.RWMutex + readSem chan struct{} + writeSem chan struct{} + readers int + writers int + readerCond *sync.Cond + writerCond *sync.Cond + mux sync.Mutex +} + +func GetFile(filename string) *File { + file := &File{ + filename: filename, + rwMutex: sync.RWMutex{}, + readSem: make(chan struct{}, 2), + writeSem: make(chan struct{}, 1), + readers: 0, + writers: 0, + mux: sync.Mutex{}, + } + file.readerCond = sync.NewCond(&file.mux) + file.writerCond = sync.NewCond(&file.mux) + return file +} + +func (f *File) GetReaderToken() { + // increment semaphore + f.readSem <- struct{}{} + // limit at most 4 successive readers + + f.mux.Lock() + for f.readers >= 4 && len(f.writeSem) > 0 { + f.readerCond.Wait() + } + f.readers++ + f.mux.Unlock() + // get reader token, any write cannot get token + f.rwMutex.RLock() + f.writers = 0 + // Signal waiting writer that they may be able to proceed. + f.writerCond.Broadcast() +} + +func (f *File) ReleaseReaderToken() { + // unlock WRMutex so that write can get WRMutex + f.rwMutex.RUnlock() + // descrease the semaphore. + <-f.readSem +} + +func (f *File) GetWriterToken() { + // increment semaphore + f.writeSem <- struct{}{} + for f.writers >= 4 && len(f.readSem) > 0 { + // Wait until the conditions are met for a writer token. + f.writerCond.Wait() + } + f.writers++ + // get writer token, any other write and read cannot get lock + f.rwMutex.Lock() + f.readers = 0 + // Signal waiting readers that they may be able to proceed. + f.readerCond.Broadcast() +} + +func (f *File) ReleaseWriterToken() { + // unlock WRMutex so that reader and writer can get WRMutex + f.rwMutex.Unlock() + // descreses semophore + <-f.writeSem + +} diff --git a/sdfs/types.go b/sdfs/types.go new file mode 100644 index 0000000..860ae74 --- /dev/null +++ b/sdfs/types.go @@ -0,0 +1,77 @@ +package sdfs + +import ( + "fmt" + "log" +) + +const Limit = 9000 + +type MsgType int + +const ( + PUT MsgType = iota + GET + DELETE + RECOVER + REPLICATION + REPCONFIRM + PUTCONFIRM + DELCONFIRM + GETCONFIRM + RECCONFIRM + PUTACK + DELETEACK + LISTREPLICA +) + +type Assignment struct { + Filename string + Version int + Datanodes []string +} + +type Message struct { + ID int64 + Sender string + Type MsgType + Payload interface{} +} + +func payloadToAssignment(payload interface{}) (Assignment, error) { + var assignment Assignment + + data, ok := payload.(map[string]interface{}) + if !ok { + // Payload is not an Assignment + log.Println("CLIENT: Fail to parse payload to map") + return assignment, fmt.Errorf("failed to parse payload as map") + } + + // Check if the required fields exist in the map + if filename, ok := data["Filename"].(string); ok { + assignment.Filename = filename + } else { + return assignment, fmt.Errorf("Missing or invalid 'Filename' field") + } + + if version, ok := data["Version"].(float64); ok { + assignment.Version = int(version) + } else { + return assignment, fmt.Errorf("Missing or invalid 'Version' field") + } + + if datanodes, ok := data["Datanodes"].([]interface{}); ok { + for _, node := range datanodes { + if strNode, ok := node.(string); ok { + assignment.Datanodes = append(assignment.Datanodes, strNode) + } else { + return assignment, fmt.Errorf("Invalid datanode value in 'Datanodes' field") + } + } + } else { + return assignment, fmt.Errorf("Missing or invalid 'Datanodes' field") + } + + return assignment, nil +} diff --git a/utils/gen_scp_keys.sh b/utils/gen_scp_keys.sh new file mode 100755 index 0000000..3af698d --- /dev/null +++ b/utils/gen_scp_keys.sh @@ -0,0 +1,27 @@ +#!/bin/bash + +# Generate an SSH key +ssh-keygen -t rsa -b 4096 -C "cs425mp3scp" + +# List of remote VMs +remote_vms=( + "fa23-cs425-0201.cs.illinois.edu" + "fa23-cs425-0202.cs.illinois.edu" + "fa23-cs425-0203.cs.illinois.edu" + "fa23-cs425-0204.cs.illinois.edu" + "fa23-cs425-0205.cs.illinois.edu" + "fa23-cs425-0206.cs.illinois.edu" + "fa23-cs425-0207.cs.illinois.edu" + "fa23-cs425-0208.cs.illinois.edu" + "fa23-cs425-0209.cs.illinois.edu" + "fa23-cs425-0210.cs.illinois.edu" +) + +# Get the current user and hostname +current_user=$(whoami) +current_hostname=$(hostname) + +# Loop through the remote VMs and copy the SSH key if it's not the current VM +for vm in "${remote_vms[@]}"; do + ssh-copy-id "$current_user@$vm" +done diff --git a/utils/go.mod b/utils/go.mod new file mode 100644 index 0000000..527745c --- /dev/null +++ b/utils/go.mod @@ -0,0 +1,3 @@ +module cs425mp4/utils + +go 1.21.0 diff --git a/utils/update_go_mod_version.sh b/utils/update_go_mod_version.sh new file mode 100755 index 0000000..3970186 --- /dev/null +++ b/utils/update_go_mod_version.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +# Specify the desired Go version +desired_go_version=1.19 # Replace with your desired version (e.g., "1.19" or "1.23.0") + +# Find all go.mod files in the current directory and its subdirectories +go_mod_files=$(find . -type f -name "go.mod") + +for go_mod_file in $go_mod_files; do + # Check if the file exists and is readable + if [ -r "$go_mod_file" ]; then + # Replace the Go version in the go.mod file + sed -i "s/go [0-9]\+\(\.[0-9]\+\)*\(\.0\)\?/go $desired_go_version/" "$go_mod_file" + echo "Modified $go_mod_file to use Go version $desired_go_version" + else + echo "Error: Cannot read $go_mod_file" + fi +done diff --git a/utils/utils.go b/utils/utils.go new file mode 100644 index 0000000..71f0a90 --- /dev/null +++ b/utils/utils.go @@ -0,0 +1,47 @@ +package utils + +import ( + "log" + "net" + "os" + "strconv" + "strings" +) + +func parseHostname(id string) string { + components := strings.Split(id, ":") + return components[0] +} + +func parseAddress(id string) string { + components := strings.Split(id, ":") + return components[1] + ":" + components[2] +} + +func parseTimestamp(id string) int64 { + components := strings.Split(id, ":") + t, _ := strconv.ParseInt(components[3], 10, 64) + return t +} + +// resolve hosts' name to IP address +func getIP(hostname string) net.IP { + ips, err := net.LookupIP(hostname) + if err != nil { + log.Println("Fail to resolve hostname:", err) + os.Exit(1) + } + for _, ip := range ips { + log.Println("IP Address:", ip.String()) + } + return ips[0] +} + +func getLocalIP() net.IP { + hostname, err := os.Hostname() + if err != nil { + log.Println("Fail to get hostname:", err) + os.Exit(1) + } + return getIP(hostname) +} -- GitLab