Skip to content
Snippets Groups Projects
Commit 1f6f9acf authored by Tingyin Ding's avatar Tingyin Ding
Browse files

Init repo with basic code structure.

parent 1b74c622
No related branches found
No related tags found
No related merge requests found
Pipeline #198047 canceled
Showing with 1998 additions and 0 deletions
{
"datanode":5001,
"namenode": 5002,
"client":5003
}
\ No newline at end of file
{
"Introducor": "fa23-cs425-0201.cs.illinois.edu",
"Port": "8000",
"T_gossip": 0.8,
"Fanout": 3,
"T_fail": 3,
"T_cleanup": 3,
"T_suspicion": 2,
"P_ml": 0,
"Mode": "Gossip"
}
/* election.go
Given a potential list of candidates, return the ID of the leader.
Implement the bully algorithm.
*/
package election
import (
"encoding/json"
"fmt"
"log"
"net"
"os"
"strings"
"sync"
"time"
)
var ALLHOST = []string{"fa23-cs425-0201.cs.illinois.edu", "fa23-cs425-0202.cs.illinois.edu", "fa23-cs425-0203.cs.illinois.edu", "fa23-cs425-0204.cs.illinois.edu", "fa23-cs425-0205.cs.illinois.edu", "fa23-cs425-0206.cs.illinois.edu", "fa23-cs425-0207.cs.illinois.edu", "fa23-cs425-0208.cs.illinois.edu", "fa23-cs425-0209.cs.illinois.edu", "fa23-cs425-0210.cs.illinois.edu"}
const PORT = "8080"
const leaderfile = "leader.txt"
type MsgType int
const (
COORDINATOR MsgType = iota
ELECTION
ANSWER
)
type Message struct {
Sender string
Type MsgType
}
type Election struct {
leader string // hostname
startNode string // hostname
candidates []string // [hostname]
timeout time.Duration // timeout in seconds
state bool // true if electing
answerReceived chan Message
ElectionReceived chan Message
onLeaderChange chan struct{}
mu sync.RWMutex
electionMu sync.Mutex
}
func InitializeElection(startNode string, timeout int) *Election {
election := new(Election)
election.leader = "fa23-cs425-0201.cs.illinois.edu"
election.startNode = startNode
memberList, err := getMeberList("membership.json")
if err != nil {
election.candidates = ALLHOST
} else {
election.candidates = memberList
}
election.timeout = time.Duration(timeout) * time.Second
election.state = false
election.answerReceived = make(chan Message)
election.ElectionReceived = make(chan Message)
election.onLeaderChange = make(chan struct{})
election.mu = sync.RWMutex{}
election.electionMu = sync.Mutex{}
return election
}
func (e *Election) GetLeader() string {
e.mu.RLock()
defer e.mu.RUnlock()
return e.leader
}
func (e *Election) SetLeader(leader string) {
e.mu.Lock()
defer e.mu.Unlock()
if e.leader != leader {
log.Println("ELECTION: Set leader", leader)
e.leader = leader
// Convert the leader to bytes
leaderBytes := []byte(leader)
// Write the leader to the file
err := os.WriteFile(leaderfile, leaderBytes, 0644)
if err != nil {
log.Println("ELECTION: Error writing to the file:", err)
return
}
}
log.Println("ELECTION: Leader doesn't change.")
}
func (e *Election) Run(workfunc func()) {
go e.ElectionListener()
go func() {
for {
if (!e.state) && (e.GetLeader() == "") {
log.Println("ELECTION: No leader, re elect")
e.StartElection()
}
time.Sleep(1 * time.Second)
}
}()
for {
select {
case <-e.onLeaderChange:
log.Println("ELECTION: Leader changed, work")
workfunc()
case msg := <-e.ElectionReceived:
if (!e.state) && msg.Type == ELECTION {
log.Println("ELECTION: Received election msg, elect")
e.StartElection()
}
default:
// do nothing
}
}
}
// func (e *Election) Run(workfunc func()) {
// go e.ElectionListener()
// // e.StartElection()
// // for msg := range e.ElectionReceived {
// // if msg.Type == ELECTION {
// // e.StartElection()
// // }
// // }
// for {
// select {
// case <-e.onLeaderChange:
// log.Println("ELECTION: leader changes...run workfunc()")
// workfunc()
// case <-e.ElectionReceived:
// log.Println("ELECTION: Process ELECTION")
// e.StartElection()
// }
// }
// }
func (e *Election) StartElection() {
e.electionMu.Lock()
defer e.electionMu.Unlock()
e.state = true
e.answerReceived = make(chan Message)
memberList, err := getMeberList("membership.json")
if err != nil {
log.Println("ELECTION: Faill to get distinct datanode")
}
e.candidates = memberList
defer func() { e.state = false }()
log.Printf("ELECTION: Node %s initiates an election.\n", e.startNode)
lowestHost := e.startNode
for _, candidate := range e.candidates {
if candidate < lowestHost {
lowestHost = candidate
}
}
log.Printf("ELECTION: lowest host is %s, node is %s", lowestHost, e.startNode)
if lowestHost == e.startNode {
e.leader = e.startNode
log.Printf("ELECTION: Node %s is elected as the coordinator.\n", e.startNode)
// Send COORDINATOR message to all processes with lower ID
for _, candidate := range e.candidates {
if candidate > e.startNode {
// Send COORDINATOR message to candidate
e.sendMessage(candidate, COORDINATOR)
}
}
} else {
// Initiate an election by sending an Election message
// Send it to only processes that have a higher ID than itself
log.Printf("ELECTION: Node %s sends ELECTION to smaller IDs \n", e.startNode)
for _, candidate := range e.candidates {
if candidate < e.startNode {
// Send ELECTION message to candidate
e.sendMessage(candidate, ELECTION)
}
}
// Wait for an ANSWER message
select {
case answerMessage := <-e.answerReceived:
// Handle the ANSWER message received from another process
log.Printf("ELECTION: Node %s received an ANSWER from %s.\n", e.startNode, answerMessage.Sender)
//TODO: maybe bug here, how to wait timeout seconds?
return
case <-time.After(e.timeout):
// Received no answer within the timeout, declare itself leader
log.Println("ELECTION: Wait timeout. No answer received.")
e.SetLeader(e.startNode)
e.onLeaderChange <- struct{}{}
log.Printf("ELECTION: Node %s is elected as the coordinator (no answer received).\n", e.startNode)
// Send COORDINATOR message to all lower ID processes
for _, candidate := range e.candidates {
if candidate > e.startNode {
// Send COORDINATOR message to candidate
log.Println("ELECTION: Send COORDINATOR to ", candidate)
e.sendMessage(candidate, COORDINATOR)
}
}
return
}
}
}
// MessageHandler function to process incoming messages
func (election *Election) ElectionListener() {
// listen on the port
// address := "0.0.0.0:" + PORT
address := ":" + PORT
listener, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("ELECTION: Error listening on port: %v\n", err)
}
defer listener.Close()
log.Println("ELECTION: Server is listening...")
// Buffer to store incoming data
buffer := make([]byte, 1024)
// Listen for incoming messages via network communication
for {
// Accept incoming connections
conn, err := listener.Accept()
if err != nil {
// fmt.Println("Error accepting connection:", err)
continue
}
numBytes, err := conn.Read(buffer)
if err != nil {
log.Printf("ELECTION: Error reading from connection: %v\n", err)
continue
}
data := buffer[:numBytes]
// Decode the incoming data into a Message struct
var receivedMessage Message
err = json.Unmarshal(data, &receivedMessage)
if err != nil {
log.Printf("ELECTION: Error decoding incoming data: %v\n", err)
continue
}
// log.Printf("ELECTION: Received message %+v\n", receivedMessage)
go election.handleMessage(&receivedMessage)
}
}
func (election *Election) handleMessage(receivedMessage *Message) {
switch receivedMessage.Type {
case COORDINATOR:
// Handle coordinator message
log.Printf("ELECTION: Received message from %s: COORDINATOR\n", receivedMessage.Sender)
election.SetLeader(receivedMessage.Sender)
election.onLeaderChange <- struct{}{}
case ELECTION:
// Handle election request
log.Printf("ELECTION: Received message from %s: ELECTION\n", receivedMessage.Sender)
// Send ANSWER message to sender
election.sendMessage(receivedMessage.Sender, ANSWER)
// Start a new election
if !election.state {
election.ElectionReceived <- *receivedMessage
// election.StartElection()
}
case ANSWER:
log.Printf("ELECTION: Received message from %s: ANSWER\n", receivedMessage.Sender)
election.answerReceived <- *receivedMessage
default:
log.Printf("ELECTION: Node %s received an unknown message type: %d\n", election.startNode, receivedMessage.Type)
}
}
func (election *Election) sendMessage(recipient string, messageType MsgType) {
message := Message{
Sender: election.startNode,
Type: MsgType(messageType),
}
messageJSON, err := json.Marshal(message)
if err != nil {
log.Println("ELECTION: Failed to serialize:", err)
return
}
recipientIP, err := getIP(recipient)
if err != nil {
log.Printf("ELECTION: Fail to send Message to %s because can't resolve IP\n", recipient)
return
}
recipientAddr := recipientIP + ":" + PORT
// set up tcp connection
conn, err := net.Dial("tcp", recipientAddr)
if err != nil {
// log.Println("ELECTION: Failed to connect to recipient:", err)
return
}
defer conn.Close()
_, err = conn.Write([]byte(messageJSON))
if err != nil {
log.Println("ELECTION: Error sending tcp data:", err)
return
}
// log.Printf("ELECTION: Send message type %d to %s", messageType, recipient)
}
// resolve hosts' name to IP address
func getIP(hostname string) (string, error) {
ips, err := net.LookupIP(hostname)
if err != nil {
log.Println("ELECTION: Fail to resolve hostname:", err)
return "", err
}
return ips[0].String(), nil
}
// test
// func main() {
// hostname, _ := os.Hostname()
// selfID := hostname
// election := InitializeElection(selfID, 5)
// go func() {
// for i := 0; i < 30; i++ {
// time.Sleep(1 * time.Second)
// fmt.Println("Current leader: ", election.GetLeader(), election.state)
// if i == 15 {
// // election.SetLeader("")
// election.StartElection()
// }
// }
// }()
// election.Run()
// select {}
// }
func getMeberList(filePath string) ([]string, error) {
jsonData, err := readJSONFile(filePath)
if err != nil {
return nil, err
}
// Extract the membershiplist as an array of interface{}
membershiplistInterface, ok := jsonData["membershiplist"].([]interface{})
if !ok {
return nil, fmt.Errorf("Failed to extract membershiplist")
}
// Convert the interface{} elements to strings
var membershiplist []string
for _, member := range membershiplistInterface {
if s, ok := member.(string); ok {
// Split the input string by colons
parts := strings.Split(s, ":")
// Extract the part we need and trim any leading/trailing spaces
address := strings.TrimSpace(parts[0])
membershiplist = append(membershiplist, address)
}
}
return membershiplist, nil
}
func readJSONFile(filePath string) (map[string]interface{}, error) {
// Read the JSON file
data, err := os.ReadFile(filePath)
if err != nil {
return nil, err
}
// Parse the JSON data into a generic map
var jsonData map[string]interface{}
if err := json.Unmarshal(data, &jsonData); err != nil {
return nil, err
}
return jsonData, nil
}
module cs425mp4/election
go 1.21.0
go.mod 0 → 100644
module cs425mp4
go 1.21.0
replace cs425mp4/election => ./election
replace cs425mp4/membership => ./membership
replace cs425mp4/sdfs => ./sdfs
replace cs425mp4/maplejuice => ./maplejuice
replace cs425mp4/utils => ./utils
fa23-cs425-0203.cs.illinois.edu
\ No newline at end of file
package main
func main() {
}
package maplejuice
type ApplicationManager struct {
NodeManagers []*NodeManager
// Other application-level fields...
}
func NewApplicationManager(nodeManagers []*NodeManager) *ApplicationManager {
return &ApplicationManager{
NodeManagers: nodeManagers,
// Initialize other fields...
}
}
func (am *ApplicationManager) StartMapleJuiceProcess(tasks []Task) {
// Logic to start and manage the MapleJuice process...
}
func (am *ApplicationManager) GatherResults(taskIDs []string) {
// Logic to gather results of tasks...
}
// Other application management methods...
module cs425mp4/maplejuice
go 1.21.0
package maplejuice
type NodeManager struct {
ResourceManager *ResourceManager
// Other node-specific fields...
}
func NewNodeManager(resourceManager *ResourceManager) *NodeManager {
return &NodeManager{
ResourceManager: resourceManager,
// Initialize other fields...
}
}
func (nm *NodeManager) ExecuteTask(task Task) {
// Logic to execute a given task...
}
func (nm *NodeManager) HandleFailure(task Task) {
// Logic to handle task failure...
}
// Other node management methods...
package maplejuice
type ResourceManager struct {
// Resource tracking and management fields...
}
func NewResourceManager() *ResourceManager {
return &ResourceManager{
// Initialize resource manager...
}
}
func (rm *ResourceManager) AllocateResources(task Task) bool {
// Logic to allocate resources for a task...
return true
}
func (rm *ResourceManager) ReleaseResources(task Task) {
// Logic to release resources after task completion...
}
// Other resource management methods...
package maplejuice
// Common types and utility functions...
type Task struct {
ID string
Type string // "maple" or "juice"
Data []string
Executable string
}
type TaskResult struct {
TaskID string
Output []string
Error error
}
// Other necessary types and utilities...
{"id":"vpnpool-10-250-47-123.near.illinois.edu:76.10.46.206:8000:1698897035","membershiplist":["vpnpool-10-250-47-123.near.illinois.edu:76.10.46.206:8000:1698897035"]}
\ No newline at end of file
package membership
import "sync"
type FailedList struct {
mu sync.Mutex
list []string
}
func (f *FailedList) Add(id string) {
f.mu.Lock()
defer f.mu.Unlock()
found := false
for _, item := range f.list {
if item == id {
found = true
break
}
}
if !found {
f.list = append(f.list, id)
}
}
func (f *FailedList) Exists(id string) bool {
f.mu.Lock()
defer f.mu.Unlock()
for _, item := range f.list {
if item == id {
return true
}
}
return false
}
func (f *FailedList) Delete(id string) {
f.mu.Lock()
defer f.mu.Unlock()
found := false
var idx int
for i, item := range f.list {
if item == id {
found = true
idx = i
break
}
}
if found {
f.list = append(f.list[:idx], f.list[idx+1:]...)
}
}
package membership
import (
"fmt"
"log"
"sync"
)
type Status int
const (
ALIVE Status = iota
SUSPECTED
FAILED
LEFT
)
type MembershipInfo struct {
Heartbeat int
Timestamp int64
Status
}
func (info MembershipInfo) String() string {
return fmt.Sprintf("Heartbeat: %d, Timestamp: %d, Status: %d", info.Heartbeat, info.Timestamp, info.Status)
}
type MembershipList struct {
mu sync.Mutex
ml map[string]MembershipInfo
}
func (ml *MembershipList) String() string {
ml.mu.Lock()
defer ml.mu.Unlock()
var result string
for id, info := range ml.ml {
result += fmt.Sprintf("ID: %s ", id)
result += fmt.Sprintf("MembershipInfo: %s\n", info)
}
return result
}
func (ml *MembershipList) Get(id string) (*MembershipInfo, bool) {
ml.mu.Lock()
defer ml.mu.Unlock()
mem, exists := ml.ml[id]
if exists {
return &mem, true
}
return nil, false
}
func (ml *MembershipList) ExistHost(hostname string) bool {
ml.mu.Lock()
defer ml.mu.Unlock()
// Check if the hostname (id) exists in the membership list
for id := range ml.ml {
if (ml.ml[id].Status == FAILED) || (ml.ml[id].Status == LEFT) {
continue
}
log.Println("Membership: parse id", id, parseHostname(id))
if parsedHostname := parseHostname(id); parsedHostname == hostname {
log.Println("Membership: leader exists: ", parseHostname(id))
return true
}
}
return false
}
func (ml *MembershipList) GetList() map[string]MembershipInfo {
ml.mu.Lock()
defer ml.mu.Unlock()
return ml.ml
}
module cs425mp4/membership
go 1.21.0
package membership
import (
"context"
"encoding/json"
"fmt"
"log"
"math/rand"
"net"
"os"
"time"
)
// define message
type MsgType int
const (
JOIN MsgType = iota
LEAVE
REJOIN
GOSSIP
GOSSIPS
CLIENT
)
type Message struct {
Sender string
Type MsgType
Payload interface{}
}
func (n *Node) sendMessage(address string, msgType MsgType, Payload interface{}) {
message := Message{
Sender: n.ID.String(),
Type: MsgType(msgType),
Payload: Payload,
}
messageJSON, err := json.Marshal(message)
if err != nil {
log.Println("GOSSIP messageHandler.go: Failed to serialize MembershipList to JSON:", err)
return
}
// set up udp connection
conn, err := net.Dial("udp", address)
if err != nil {
log.Println("GOSSIP messageHandler.go: Failed to connect to introducer:", err)
return
}
defer conn.Close()
_, err = conn.Write([]byte(messageJSON))
if err != nil {
log.Println("GOSSIP messageHandler.go: Error sending UDP data:", err)
return
}
// log.Printf("GOSSIP messageHandler.go: Send message type %d to %s: %s", msgType, address, messageJSON)
}
func (n *Node) StartMessageHandler(ctx context.Context) {
// listen on the port
address := "0.0.0.0:" + n.ID.Port
listener, err := net.ListenPacket("udp", address)
if err != nil {
log.Fatalf("Error listening on port: %v\n", err)
}
defer listener.Close()
log.Println("GOSSIP messageHandler.go: Server is listening on port", n.ID.Port)
// Buffer to store incoming data
buffer := make([]byte, 1024)
// handling connections
for {
select {
case <-ctx.Done():
log.Println("GOSSIP messageHandler.go: startMessageHandler received stop signal")
return
default:
// Read data from UDP connection
numBytes, _, err := listener.ReadFrom(buffer)
if err != nil {
log.Printf("GOSSIP messageHandler.go: Error reading from connection: %v\n", err)
continue
}
data := buffer[:numBytes]
// Decode the incoming data into a Message struct
var receivedMessage Message
err = json.Unmarshal(data, &receivedMessage)
if err != nil {
log.Printf("GOSSIP messageHandler.go: Error decoding incoming data: %v\n", err)
continue
}
// log.Printf("GOSSIP messageHandler.go: Received message from %s: %+v\n", addr.String(), receivedMessage)
// Handle the received Message struct as needed
go handleMessage(n, receivedMessage)
}
}
}
func handleMessage(n *Node, receivedMessage Message) {
// Extract the message elements from the received message
sender := receivedMessage.Sender
messageType := receivedMessage.Type
msg := receivedMessage.Payload
if n.FailedList.Exists(sender) {
// if detect false positives, send rejoin message and ask it rejoin
n.sendRejoin(sender)
// log.Printf("GOSSIP messageHandler.go: False Positive: ID: %s \n", sender)
// fmt.Printf("False Positive: ID: %s \n", sender)
return
}
// Handle different message types
switch messageType {
case CLIENT:
n.handleClient(msg)
case JOIN:
n.handleJoin(msg)
case LEAVE:
n.handleLeave(msg)
case GOSSIP:
// Introduce a message drop rate for gossip
r := rand.Float64()
if r < n.Configuration.P_ml {
log.Println("GOSSIP messageHandler.go: Drop Message", messageType, msg)
return
}
n.handleGossip(msg)
case GOSSIPS:
// Introduce a message drop rate for gossip
r := rand.Float64()
if r < n.Configuration.P_ml {
log.Println("GOSSIP messageHandler.go: Drop Message", messageType, msg)
return
}
n.handleGossipS(msg)
case REJOIN:
log.Printf("GOSSIP messageHandler.go: False Positive: sender ID: %s \n", sender)
fmt.Printf("False Positive: sender ID: %s \n", sender)
n.handleRejoin(msg)
default:
log.Printf("GOSSIP messageHandler.go: Unknown message type: %d", messageType)
}
}
func (node *Node) handleRejoin(payload interface{}) {
id := payload.(string)
t := parseTimestamp(id)
// if the rejoin version is lower than current, just ignore
if t < node.ID.Timestamp {
return
}
// update current timestamp
node.ID.Timestamp = time.Now().Unix()
node.Join()
log.Println("GOSSIP messageHandler.go: Handle Rejoin: Update ID:", node.ID.String())
}
func (n *Node) handleJoin(payload interface{}) {
// As an introducer, handle JOIN messages, add the info of new node into list and sent it to new node
log.Printf("GOSSIP messageHandler.go: Handle Join: %v", payload)
membershipList, ok := payload.(map[string]interface{})
if !ok {
log.Println("GOSSIP messageHandler.go: Fail to parse payload...")
}
// update self's membershiplist
n.Combine(&membershipList)
n.onChange <- struct{}{}
log.Println("GOSSIP messageHandler.go: Successfully update self's membershiplist")
if n.Configuration.Mode == "Gossip" {
msg := n.GetGossipList()
// send the current membership list to the new node
for id := range membershipList {
address := parseAddress(id)
n.sendMessage(address, MsgType(GOSSIP), msg)
}
} else {
msg := n.GetGossipSList()
// send the current membership list to the new node
for id := range membershipList {
address := parseAddress(id)
n.sendMessage(address, MsgType(GOSSIPS), msg)
}
}
log.Printf("GOSSIP messageHandler.go: Successfully handle JOIN, current MembershipList:\n%s", &n.MembershipList)
}
func (n *Node) handleLeave(payload interface{}) {
//log.Printf("GOSSIP messageHandler.go: Handle Leave: %T", payload) //string
log.Printf("GOSSIP messageHandler.go: handling leave message...")
// get the id of LEAVE node
leaveNodeID := payload.(string)
log.Printf("GOSSIP messageHandler.go: leaveNodeID: %s", leaveNodeID)
// if node has already been removed, return
if n.FailedList.Exists(leaveNodeID) {
log.Println("GOSSIP messageHandler.go: Already removed leave node")
return
}
// mark the sender as LEAVE message
n.MembershipList.mu.Lock()
membershipInfo := n.MembershipList.ml
for id, memInfo := range membershipInfo {
if id == leaveNodeID {
// if already marked as left, do nothing
if memInfo.Status == Status(LEFT) {
n.MembershipList.mu.Unlock()
log.Println("GOSSIP messageHandler.go: Already set leave node LEFT")
return
} else {
// Mark as left
memInfo.Status = Status(LEFT)
membershipInfo[id] = memInfo
log.Println("GOSSIP messageHandler.go: Mark leave node LEFT")
}
}
}
n.MembershipList.mu.Unlock()
// send LEAVE msg to neighbors.
candidates := n.GetAliveCandidates()
for _, candidate := range candidates {
candAddr := parseAddress(candidate)
n.sendMessage(candAddr, MsgType(LEAVE), leaveNodeID)
log.Println("GOSSIP messageHandler.go: Send candidate LeaveNode")
}
log.Println("GOSSIP messageHandler.go: Successfully handle Leave")
}
func (n *Node) handleGossip(payload interface{}) {
// handle GOSSIP messages
// log.Printf("GOSSIP messageHandler.go: Handle Gossip: %T", payload)
// As an introducer, handle JOIN messages, add the info of new node into list and sent it to new node
membershipList, ok := payload.(map[string]interface{})
if !ok {
log.Println("GOSSIP messageHandler.go: Fail to parse payload...")
}
// update self's membershiplist
modified := n.combineGossip(&membershipList)
if modified {
n.onChange <- struct{}{}
}
// log.Printf("GOSSIP messageHandler.go: Successfully handle Gossip, current MembershipList:\n %s", &n.MembershipList)
}
func (n *Node) handleGossipS(payload interface{}) {
// handle GOSSIP messages
// log.Printf("GOSSIP messageHandler.go: Handle Gossip: %T", payload)
// As an introducer, handle JOIN messages, add the info of new node into list and sent it to new node
membershipList, ok := payload.(map[string]interface{})
if !ok {
log.Println("GOSSIP messageHandler.go: Fail to parse payload...")
}
// update self's membershiplist
modified := n.combineGossipS(&membershipList)
if modified {
n.onChange <- struct{}{}
}
// log.Printf("GOSSIP messageHandler.go: Successfully handle GossipS, current MembershipList:\n %s", &n.MembershipList)
}
func (n *Node) handleClient(config interface{}) {
if cfg, ok := config.(map[string]interface{}); ok {
if tGossip, ok := cfg["T_gossip"].(float64); ok {
n.T_gossip = tGossip
}
if fanout, ok := cfg["Fanout"].(float64); ok {
n.Fanout = int(fanout)
}
if mode, ok := cfg["Mode"].(string); ok {
n.Mode = mode
}
if pMl, ok := cfg["P_ml"].(float64); ok {
n.P_ml = pMl
}
if tCleanup, ok := cfg["T_cleanup"].(float64); ok {
n.T_cleanup = tCleanup
}
if tFail, ok := cfg["T_fail"].(float64); ok {
n.T_fail = tFail
}
if n.Mode == "Gossip+S" {
if tSuspicion, ok := cfg["T_suspicion"].(float64); ok {
n.T_suspicion = tSuspicion
}
}
}
log.Printf("GOSSIP messageHandler.go: Successfully handle CLIENT, current configuration: Fanout: %d, T_gossip: %.2f, T_fail: %.2f, T_cleanup:%.2f, T_suspicion: %.2f, P_ml: %.2f, Mode: %s", n.Fanout, n.T_gossip, n.T_fail, n.T_cleanup, n.T_suspicion, n.P_ml, n.Mode)
}
func (n *Node) Gossip(ctx context.Context) {
// for T_gossip microseconds, update current membership and gossip
for {
select {
case <-ctx.Done():
log.Println("GOSSIP messageHandler.go: Gossip received stop signal")
return
default:
if n.Configuration.Mode == "Gossip" {
// update self membership list
modified := n.updateMembershipGossip()
if modified {
n.onChange <- struct{}{}
}
gossipList := n.GetGossipList()
// get candidates
candidates := n.GetAliveCandidates()
// send the gossip membership list
// log.Println("GOSSIP messageHandler.go: Gossiping to candidates: ", candidates)
for _, cand := range candidates {
address := parseAddress(cand)
n.sendMessage(address, MsgType(GOSSIP), gossipList)
}
} else {
// update self membership list
modified := n.updateMembershipGossipS()
if modified {
n.onChange <- struct{}{}
}
gossipList := n.GetGossipSList()
// get candidates
candidates := n.GetAliveCandidates()
// send the gossip membership list
// log.Println("GOSSIP messageHandler.go: Gossiping to candidates: ", candidates)
for _, cand := range candidates {
address := parseAddress(cand)
n.sendMessage(address, MsgType(GOSSIPS), gossipList)
}
}
time.Sleep(time.Duration(n.Configuration.T_gossip*1000) * time.Millisecond)
}
}
}
func (node *Node) Join() {
// solve introducer DNS address to ip address
ips, err := net.LookupIP(node.Configuration.Introducor)
if err != nil {
log.Println("GOSSIP messageHandler.go: Fail to resolve hostname:", err)
os.Exit(1)
}
for _, ip := range ips {
log.Println("GOSSIP messageHandler.go: Introducer IP Address:", ip.String())
}
introIP := ips[0].String()
// If the node is introducer, just wait for connection
// if node.ID.IP == introIP {
// return
// }
// If the node is not introducer, send JOIN message to introducer
// Establish a UDP connection to the introducer
introducerAddress := introIP + ":" + node.ID.Port
msg := make(map[string][]int)
msg[node.ID.String()] = []int{0, int(Status(ALIVE))}
node.SaveMembershipList()
node.sendMessage(introducerAddress, MsgType(JOIN), msg)
}
func (node *Node) Leave() {
// get alive candidates
candidates := node.GetAliveCandidates()
for _, candidate := range candidates {
candAddr := parseAddress(candidate)
node.sendMessage(candAddr, MsgType(LEAVE), node.ID.String())
}
log.Println("GOSSIP messageHandler.go: Sent Leave message")
}
func (node *Node) sendRejoin(id string) {
candAddr := parseAddress(id)
node.sendMessage(candAddr, MsgType(REJOIN), id)
}
package membership
import (
"context"
"encoding/json"
"fmt"
"log"
"math/rand"
"net"
"os"
"sync"
"time"
)
// define node
type ID struct {
Hostname string
IP string
Port string
Timestamp int64
}
func (id ID) String() string {
return fmt.Sprintf("%s:%s:%s:%d", id.Hostname, id.IP, id.Port, id.Timestamp)
}
type Configuration struct {
Introducor string
Port string
Fanout int
T_gossip float64
T_fail float64
T_cleanup float64
T_suspicion float64
P_ml float64
Mode string
}
type Node struct {
ID
Configuration
MembershipList
FailedList
onChange chan struct{}
}
func InitializeNode(config Configuration) *Node {
// get local ip address
addrs, err := net.InterfaceAddrs()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
var localIP string
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
localIP = ipnet.IP.String()
}
}
}
hostname, _ := os.Hostname()
id := ID{Hostname: hostname, IP: localIP, Port: config.Port, Timestamp: time.Now().Unix()}
membershipInfo := MembershipInfo{
Heartbeat: 0,
Timestamp: time.Now().Unix(),
Status: Status(ALIVE),
}
membershipList := make(map[string]MembershipInfo)
membershipList[id.String()] = membershipInfo
myNode := Node{
ID: id,
Configuration: config,
MembershipList: MembershipList{
mu: sync.Mutex{},
ml: membershipList,
},
FailedList: FailedList{
mu: sync.Mutex{},
list: []string{},
},
onChange: make(chan struct{}),
}
log.Printf("GOSSIP node.go: Successfully initialize node: %s", myNode.ID)
return &myNode
}
func (node *Node) Run(ctx context.Context, workfunc func()) {
node.Join()
go node.StartMessageHandler(ctx)
go node.Gossip(ctx)
for range node.onChange {
log.Println("GOSSIP node.go: Handle membership change")
workfunc()
}
}
func (node *Node) GetMembershipList() string {
// only return non-failed members
node.MembershipList.mu.Lock()
defer node.MembershipList.mu.Unlock()
var membershipList string
for id, memInfo := range node.MembershipList.ml {
if memInfo.Status == Status(ALIVE) || memInfo.Status == Status(SUSPECTED) {
membershipList += fmt.Sprintf("ID: %s ", id)
membershipList += fmt.Sprintf("MembershipInfo: %s\n", memInfo)
}
}
return membershipList
}
func (node *Node) SaveMembershipList() {
// only return non-failed members
node.MembershipList.mu.Lock()
defer node.MembershipList.mu.Unlock()
var membershipList []string
for id, memInfo := range node.MembershipList.ml {
if memInfo.Status == Status(ALIVE) || memInfo.Status == Status(SUSPECTED) {
membershipList = append(membershipList, id)
}
}
data := struct {
ID string `json:"id"`
MembershipIDs []string `json:"membershiplist"`
}{
ID: node.ID.String(),
MembershipIDs: membershipList,
}
// Marshal the data to JSON
jsonData, err := json.Marshal(data)
if err != nil {
log.Println("GOSSIP node.go: Error marshaling data:", err)
return
}
// Write the JSON data to a file
filePath := "membership.json"
err = os.WriteFile(filePath, jsonData, 0644)
if err != nil {
log.Println("GOSSIP node.go: Error writing JSON data to the file:", err)
return
}
log.Printf("GOSSIP node.go: Membership data saved to '%s'.\n", filePath)
}
func (node *Node) Print() {
// only return non-failed members
node.MembershipList.mu.Lock()
defer node.MembershipList.mu.Unlock()
var membershipList string
for id, memInfo := range node.MembershipList.ml {
if memInfo.Status == Status(ALIVE) || memInfo.Status == Status(SUSPECTED) {
membershipList += fmt.Sprintf("ID: %s ", id)
membershipList += fmt.Sprintf("MembershipInfo: %s\n", memInfo)
}
}
fmt.Println(membershipList)
}
func (node *Node) GetAliveCandidates() []string {
node.MembershipList.mu.Lock()
defer node.MembershipList.mu.Unlock()
fanout := node.Configuration.Fanout
cand := []string{}
for id, memInfo := range node.MembershipList.ml {
if id == node.ID.String() {
// don't send to self
continue
}
if memInfo.Status == Status(ALIVE) || memInfo.Status == Status(SUSPECTED) {
cand = append(cand, id)
}
}
// random select #fanout nodes and connect to the selected nodes
rand.Shuffle(len(cand), func(i, j int) {
cand[i], cand[j] = cand[j], cand[i]
})
if fanout > len(cand) {
fanout = len(cand)
}
cand = cand[:fanout]
return cand
}
func (node *Node) Combine(other_ml *map[string]interface{}) {
if node.Configuration.Mode == "Gossip" {
node.combineGossip(other_ml)
} else if node.Configuration.Mode == "Gossip+S" {
node.combineGossipS(other_ml)
}
}
// func (n *Node) UpdateMembershipList() {
// if n.Configuration.Mode == "Gossip" {
// n.updateMembershipGossip()
// } else {
// n.updateMembershipGossipS()
// }
// }
func (node *Node) combineGossip(other_ml *map[string]interface{}) bool {
node.MembershipList.mu.Lock()
defer node.MembershipList.mu.Unlock()
modified := false
ml := node.MembershipList.ml
current_time := time.Now().Unix()
for id, info := range *other_ml {
info, _ := info.([]interface{})
arr := make([]int, 2)
for i, v := range info {
arr[i] = int(v.(float64))
}
heartbeat := arr[0]
currentMem, exists := ml[id]
if exists {
if (heartbeat > currentMem.Heartbeat) && (currentMem.Status != Status(FAILED)) && (currentMem.Status != Status(LEFT)) {
// Update heartbeat if the current member is not Failed or Left
currentMem.Heartbeat = heartbeat
currentMem.Timestamp = current_time
}
ml[id] = currentMem
} else {
if !node.FailedList.Exists(id) {
log.Printf("GOSSIP node.go: Add new member: ID: %s timestamp: %v\n", id, current_time)
// fmt.Printf("Add new member: ID: %s timestamp: %v\n", id, current_time)
ml[id] = MembershipInfo{Heartbeat: heartbeat, Timestamp: current_time, Status: Status(ALIVE)}
modified = true
}
}
}
return modified
}
func (node *Node) combineGossipS(other_ml *map[string]interface{}) bool {
node.MembershipList.mu.Lock()
defer node.MembershipList.mu.Unlock()
modified := false
ml := node.MembershipList.ml
current_time := time.Now().Unix()
for id, info := range *other_ml {
info, _ := info.([]interface{})
arr := make([]int, 2)
for i, v := range info {
arr[i] = int(v.(float64))
}
heartbeat := int(arr[0])
status := Status(int(arr[1]))
currentMem, exists := ml[id]
if !exists {
if !node.FailedList.Exists(id) {
ml[id] = MembershipInfo{Heartbeat: heartbeat, Timestamp: current_time, Status: status}
log.Printf("GOSSIP node.go: Add new member: ID: %s timestamp: %v\n", id, current_time)
modified = true
// fmt.Printf("Add new member: ID: %s timestamp: %v\n", id, current_time)
}
} else if (currentMem.Status != Status(FAILED)) && (currentMem.Status != Status(LEFT)) {
if (status == Status(FAILED)) || (status == Status(LEFT)) {
currentMem.Status = status
currentMem.Timestamp = current_time
log.Printf("GOSSIP node.go: COMBINE FAILED: ID: %s timestamp: %v\n", id, current_time)
fmt.Printf("COMBINE FAILED: ID: %s timestamp: %v\n", id, current_time)
node.FailedList.Add(id)
modified = true
} else if heartbeat > currentMem.Heartbeat {
currentMem.Status = status
currentMem.Heartbeat = heartbeat
currentMem.Timestamp = current_time
log.Printf("GOSSIP node.go: COMBINE: higher heartbeat. ID: %s status: %d, timestamp: %v\n", id, status, current_time)
} else if (heartbeat == currentMem.Heartbeat) && (status == Status(SUSPECTED)) && (currentMem.Status == Status(ALIVE)) {
currentMem.Status = status
currentMem.Timestamp = current_time
log.Printf("GOSSIP node.go: COMBINE SUSPECTED:ID: %s status: %d, timestamp: %v\n", id, status, current_time)
fmt.Printf("COMBINE SUSPECTED:ID: %s status: %d, timestamp: %v\n", id, status, current_time)
}
ml[id] = currentMem
}
}
return modified
}
func (n *Node) updateMembershipGossip() bool {
// update the membershiplist with gossip mode
n.MembershipList.mu.Lock()
defer n.MembershipList.mu.Unlock()
modified := false
// store the candidate nodes to gossip
ml := n.MembershipList.ml
current_time := time.Now().Unix()
cleanupList := make([]string, 0, len(ml))
for id, info := range ml {
if id == n.ID.String() {
// update self info
info.Heartbeat++
info.Timestamp = current_time
info.Status = Status(ALIVE)
} else {
time_diff := float64(current_time - info.Timestamp)
if (info.Status != Status(FAILED)) && (info.Status != Status(LEFT)) && (time_diff > float64(n.Configuration.T_fail)) {
// mark the member as failed
info.Status = Status(FAILED)
info.Timestamp = current_time
log.Printf("GOSSIP node.go: UPDATE FAILED: ID: %s timestamp: %v\n", id, current_time)
fmt.Printf("UPDATE FAILED: ID: %s timestamp: %v\n", id, current_time)
n.FailedList.Add(id)
modified = true
} else if (info.Status == Status(FAILED) || info.Status == Status(LEFT)) && (time_diff > float64(n.Configuration.T_cleanup)) {
// clean up the member from membershiplist
cleanupList = append(cleanupList, id)
}
}
ml[id] = info
}
// clean up members in the cleanupList
for _, id := range cleanupList {
// n.FailedList.Delete(id) // comment out to avoid ghost member
delete(ml, id)
log.Printf("GOSSIP node.go: CLEANUP: ID: %s timestamp: %v\n", id, current_time)
// fmt.Printf("CLEANUP: ID: %s timestamp: %v\n", id, current_time)
}
return modified
}
func (n *Node) updateMembershipGossipS() bool {
// update the membershiplist with gossip mode, return current non-failed members
n.MembershipList.mu.Lock()
defer n.MembershipList.mu.Unlock()
modified := false
// store the candidate nodes to gossip
ml := n.MembershipList.ml
current_time := time.Now().Unix()
cleanupList := make([]string, 0, len(ml))
for id, info := range ml {
if id == n.ID.String() {
// update self info
info.Heartbeat++
info.Timestamp = current_time
info.Status = Status(ALIVE)
} else {
time_diff := float64(current_time - info.Timestamp)
if (info.Status == Status(FAILED) || info.Status == Status(LEFT)) && (time_diff > float64(n.Configuration.T_cleanup)) {
// clean up the member from membershiplist
cleanupList = append(cleanupList, id)
} else if (info.Status == Status(ALIVE)) && (time_diff > float64(n.Configuration.T_suspicion)) {
// mark the member as suspected
info.Status = Status(SUSPECTED)
info.Timestamp = current_time
log.Printf("GOSSIP node.go: UPDATE SUSPECTED: ID: %s timestamp: %v\n", id, current_time)
fmt.Printf("UPDATE SUSPECTED: ID: %s timestamp: %v\n", id, current_time)
} else if (info.Status == Status(SUSPECTED)) && (time_diff > float64(n.Configuration.T_fail)) {
// mark the member as failed
info.Status = Status(FAILED)
info.Timestamp = current_time
log.Printf("GOSSIP node.go: UPDATE FAILED: ID: %s timestamp: %v\n", id, current_time)
fmt.Printf("UPDATE FAILED: ID: %s timestamp: %v\n", id, current_time)
n.FailedList.Add(id)
modified = true
}
}
ml[id] = info
}
// clean up members in the cleanupList
for _, id := range cleanupList {
// n.FailedList.Delete(id) // comment out to avoid ghost member
delete(ml, id)
log.Printf("GOSSIP node.go: CLEANUP: ID: %s timestamp: %v\n", id, current_time)
// fmt.Printf("CLEANUP: ID: %s timestamp: %v\n", id, current_time)
}
return modified
}
func (n *Node) GetGossipList() map[string][]int {
// return current non-failed members
n.MembershipList.mu.Lock()
defer n.MembershipList.mu.Unlock()
ml := n.MembershipList.ml
gossipList := make(map[string][]int)
for id, info := range ml {
if info.Status == Status(ALIVE) {
gossipList[id] = []int{info.Heartbeat, int(info.Status)}
}
}
return gossipList
}
func (n *Node) GetGossipSList() map[string][]int {
// return every member
n.MembershipList.mu.Lock()
defer n.MembershipList.mu.Unlock()
ml := n.MembershipList.ml
gossipList := make(map[string][]int)
for id, info := range ml {
gossipList[id] = []int{info.Heartbeat, int(info.Status)}
}
return gossipList
}
package membership
import (
"log"
"net"
"os"
"strconv"
"strings"
)
func parseHostname(id string) string {
components := strings.Split(id, ":")
return components[0]
}
func parseAddress(id string) string {
components := strings.Split(id, ":")
return components[1] + ":" + components[2]
}
func parseTimestamp(id string) int64 {
components := strings.Split(id, ":")
t, _ := strconv.ParseInt(components[3], 10, 64)
return t
}
// resolve hosts' name to IP address
func getIP(hostname string) net.IP {
ips, err := net.LookupIP(hostname)
if err != nil {
log.Println("Fail to resolve hostname:", err)
os.Exit(1)
}
for _, ip := range ips {
log.Println("IP Address:", ip.String())
}
return ips[0]
}
func getLocalIP() net.IP {
hostname, err := os.Hostname()
if err != nil {
log.Println("Fail to get hostname:", err)
os.Exit(1)
}
return getIP(hostname)
}
// Client.go
// Handle get/put/delete request functions
package sdfs
import (
"encoding/json"
"fmt"
"log"
"net"
"os"
"os/exec"
"strconv"
"strings"
"time"
)
type Client struct {
ID string
Port string
NamenodeAddr string
}
func InitializeClient(id, port, namenodeAddr string) *Client {
client := new(Client)
client.ID = parseIP(id) + ":" + port
client.Port = port
client.NamenodeAddr = namenodeAddr
return client
}
func (c *Client) Run() {
listener, err := net.Listen("tcp", ":"+c.Port)
if err != nil {
log.Fatalf("CLIENT: Error listening on port: %v\n", err)
}
defer listener.Close()
log.Println("CLIENT: Server is listening...")
// Listen for incoming messages via network communication
for {
// Accept incoming connections
conn, err := listener.Accept()
if err != nil {
log.Println("CLIENT: Error accepting connection:", err)
continue
}
message, err := c.readString(conn)
if err != nil {
log.Printf("CLIENT: Error decoding incoming data: %v\n", err)
continue
}
log.Printf("CLIENT: Received message %+v\n", message)
go c.handleMessage(conn, message)
}
}
func (c *Client) handleMessage(conn net.Conn, receivedMessage string) {
components := strings.Split(receivedMessage, " ")
switch components[0] {
case "GET":
log.Println("CLIENT: Received GET request")
if len(components) == 3 {
sdfsFilename := components[1]
localFilename := components[2]
c.handleGet(conn, sdfsFilename, localFilename)
} else {
log.Printf("CLIENT: Received invalid GET request %s\n", receivedMessage)
}
case "PUT":
log.Println("CLIENT: Received PUT request")
if len(components) == 3 {
localFilename := components[1]
sdfsFilename := components[2]
c.handlePUT(conn, localFilename, sdfsFilename)
} else {
log.Printf("CLIENT: Received invalid PUT request %s\n", receivedMessage)
}
default:
log.Printf("CLIENT: received an unknown message: %s\n", receivedMessage)
}
}
func (c *Client) Get(sdfsFileName, localFileName string) error {
log.Println("CLIENT: GET : Get", sdfsFileName, "from SDFS")
// Step 1: Connect to the NameNode using TCP
nameNodeConn, err := net.Dial("tcp", c.NamenodeAddr)
if err != nil {
log.Println("CLIENT: GET :Fail to connect to namenode", err)
return err
}
defer nameNodeConn.Close()
log.Println("CLIENT: GET :Connect to NameNode ", c.NamenodeAddr)
// Step 2: Send a PUT request to the NameNode
messageID := time.Now().Unix()
err = c.writeMessage(nameNodeConn, GET, sdfsFileName, messageID)
if err != nil {
log.Println("CLIENT: GET :Fail to send GET request to namenode", err)
return err
}
log.Println("CLIENT: GET :Send GET request to namenode")
// step 3: get a list of datanode from the server
message, err := c.readMessage(nameNodeConn)
if err != nil {
log.Println("CLIENT: GET : fail to read message from server", err)
return err
}
assignment, err := payloadToAssignment(message.Payload)
if err != nil {
log.Println("CLIENT: GET : Fail to parse GET payload to assignment", err)
return err
}
log.Println("CLIENT: GET :Get assignment from namenode")
// Step 4: For each datanode, fetch file from the datanode
suc := false
for _, datanode := range assignment.Datanodes {
remote := datanode + ":cs425mp3/sdfs/" + assignment.Filename + "-" + strconv.Itoa(assignment.Version)
cmd := exec.Command("scp", "-l", strconv.Itoa(Limit), remote, localFileName)
//cmd := exec.Command("scp", remote, localFileName)
cmd.Stdout = os.Stderr
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
log.Printf("CLIENT: GET : Fail to get data from datanode %s: %v\n", datanode, err)
continue
}
suc = true
break
}
log.Printf("CLIENT: GET : Successfully get data from datanode\n")
// Step 5: acknowledge the server
err = c.writeMessage(nameNodeConn, GETCONFIRM, assignment, messageID)
if err != nil {
log.Println("CLIENT: GET : Fail to send GETCONFIRM request to namenode", err)
return err
}
if suc {
return nil
} else {
return fmt.Errorf("failed to GET from datanodes")
}
}
func (c *Client) Multiread(sdfsFileName, localFileName string, VMs []string) {
// VMs are a list of VM numbers such as 01 ... 10
resultChannel := make(chan string, len(VMs)) // Create a channel to collect results
for _, vm := range VMs {
go func(vm string) {
log.Println("multiread: send GET request to", vm)
// Step 1: get the VM hostname fa23-cs425-02xx.cs.illinois.edu and IP address
hostname := fmt.Sprintf("fa23-cs425-02%s.cs.illinois.edu", vm)
ip := getIP(hostname)
// Step 2: connect to the VM using UDP and send GET request to the clients on those VMs
requestMessage := fmt.Sprintf("GET %s %s", sdfsFileName, localFileName)
result := c.sendRequest(requestMessage, ip)
// Send the result back to the main goroutine via the channel
resultChannel <- result
}(vm)
}
// Collect results from the goroutines
for range VMs {
result := <-resultChannel
// You can process or log the result here
log.Printf("CLIENT: MULTIREAD Result: %s\n", result)
}
}
func (c *Client) Multiwrite(localFileName, sdfsFileName string, VMs []string) {
// VMs are a list of VM numbers such as 01 ... 10
resultChannel := make(chan string, len(VMs)) // Create a channel to collect results
for _, vm := range VMs {
go func(vm string) {
// Step 1: get the VM hostname fa23-cs425-02xx.cs.illinois.edu and IP address
hostname := fmt.Sprintf("fa23-cs425-02%s.cs.illinois.edu", vm)
ip := getIP(hostname)
// Step 2: connect to the VM and send PUT request to the clients on those VMs
requestMessage := fmt.Sprintf("PUT %s %s", localFileName, sdfsFileName)
result := c.sendRequest(requestMessage, ip)
// Send the result back to the main goroutine via the channel
resultChannel <- result
}(vm)
}
// Collect results from the goroutines
for range VMs {
result := <-resultChannel
// You can process or log the result here
log.Printf("CLIENT: MULTIWRITE Result: %s\n", result)
}
}
func (c *Client) handleGet(conn net.Conn, sdfsFileName, localFileName string) {
log.Printf("CLIENT: Handle Multireader: %s -> %s \n", sdfsFileName, localFileName)
defer conn.Close()
var message string
err := c.Get(sdfsFileName, localFileName)
if err != nil {
log.Printf("CLIENT: MULTIREAD: GET %s %s failed: %v", sdfsFileName, localFileName, err)
message = "FAILED"
} else {
message = "SUCCESS"
}
_, err = fmt.Fprint(conn, message)
if err != nil {
log.Printf("CLIENT: Fail to send back multiread request: %v \n", err)
}
}
func (c *Client) handlePUT(conn net.Conn, localFileName, sdfsFileName string) {
defer conn.Close()
var message string
err := c.Put(localFileName, sdfsFileName)
if err != nil {
log.Printf("CLIENT: MULTIWRITE: PUT %s %s failed: %v", localFileName, sdfsFileName, err)
message = "FAILED"
} else {
message = "SUCCESS"
}
_, err = fmt.Fprint(conn, message)
if err != nil {
log.Printf("CLIENT: Fail to send back MULTIWRITE request: %v \n", err)
}
}
func (c *Client) Put(localFileName, sdfsFileName string) error {
// Step 1: Connect to the NameNode using TCP
nameNodeConn, err := net.Dial("tcp", c.NamenodeAddr)
if err != nil {
log.Println("CLIENT: PUT : Fail to connect to namenode", err)
return err
}
defer nameNodeConn.Close()
// Step 2: Send a PUT request to the NameNode
messageID := time.Now().Unix()
err = c.writeMessage(nameNodeConn, PUT, sdfsFileName, messageID)
if err != nil {
log.Println("CLIENT: PUT : Fail to send PUT request to namenode", err)
return err
}
// step 3: get a list of datanode from the server
message, err := c.readMessage(nameNodeConn)
if err != nil {
log.Println("CLIENT: PUT : fail to read message from server", err)
return err
}
assignment, err := payloadToAssignment(message.Payload)
if err != nil {
log.Println("CLIENT: PUT : Fail to parse PUT payload to assignment", err)
return err
}
// Step 4: For each datanode, upload file to the datanode
sucDatanode := []string{}
for _, datanode := range assignment.Datanodes {
remote := datanode + ":cs425mp3/sdfs/" + assignment.Filename + "-" + strconv.Itoa(assignment.Version)
// cmd := exec.Command("scp", localFileName, remote)
cmd := exec.Command("scp", "-l", strconv.Itoa(Limit), localFileName, remote)
cmd.Stdout = os.Stderr
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
log.Printf("CLIENT: PUT : Fail to put data from datanode %s: %v\n", datanode, err)
continue
}
sucDatanode = append(sucDatanode, datanode)
}
log.Printf("CLIENT: PUT : Successfully put data from datanode\n")
// Step 5: acknowledge the server
assignment.Datanodes = sucDatanode
err = c.writeMessage(nameNodeConn, PUTCONFIRM, assignment, messageID)
if err != nil {
log.Println("CLIENT: PUT : Fail to send PUTCONFIRM request to namenode", err)
return err
}
// Step 6: Receive server's PUTCONFIRM message
message, err = c.readMessage(nameNodeConn)
if err != nil {
log.Println("CLIENT: PUT : Fail to parse PUTCONFIRM payload to assignment", err)
return err
}
if len(sucDatanode) > 0 && message.Type == PUTCONFIRM {
return nil
} else {
return fmt.Errorf("failed to PUT from datanodes")
}
}
func (c *Client) Delete(sdfsFileName string) error {
// Step 1: Connect to the NameNode using TCP
nameNodeConn, err := net.Dial("tcp", c.NamenodeAddr)
if err != nil {
log.Println("CLIENT: PUT : Fail to connect to namenode", err)
return err
}
defer nameNodeConn.Close()
// Step 2: Send a DELETE request to the NameNode
messageID := time.Now().Unix()
err = c.writeMessage(nameNodeConn, DELETE, sdfsFileName, messageID)
if err != nil {
log.Println("CLIENT: DELETE : Fail to send DELETE request to namenode", err)
return err
}
// step 3: get a list of datanode from the server
message, err := c.readMessage(nameNodeConn)
if err != nil {
log.Println("CLIENT: DELETE : fail to read message from server", err)
return err
}
assignment, err := payloadToAssignment(message.Payload)
if err != nil {
log.Println("CLIENT: DELETE : Fail to parse DELETE payload to assignment", err)
return err
}
// Step 4: For each datanode, DELETE file to the datanode
sucDatanode := []string{}
for _, datanode := range assignment.Datanodes {
remote := "cs425mp3/sdfs/" + assignment.Filename + "-*"
cmd := exec.Command("ssh", datanode, "rm "+remote)
cmd.Stdout = os.Stderr
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
log.Printf("CLIENT: DELETE : Fail to delete data from datanode %s: %v\n", datanode, err)
continue
}
sucDatanode = append(sucDatanode, datanode)
}
log.Printf("CLIENT: DELETE : Successfully delete data from datanode\n")
// Step 5: acknowledge the server
assignment.Datanodes = sucDatanode
err = c.writeMessage(nameNodeConn, DELCONFIRM, assignment, messageID)
if err != nil {
log.Println("CLIENT: DELETE : Fail to send DELCONFIRM request to namenode", err)
return err
}
// Step 6: Receive server's PUTCONFIRM message
message, err = c.readMessage(nameNodeConn)
if err != nil {
log.Println("CLIENT: DELETE : Fail to parse DELCONFIRM payload to assignment", err)
return err
}
if len(sucDatanode) > 0 && message.Type == DELCONFIRM {
return nil
} else {
return fmt.Errorf("failed to DELETE from datanodes")
}
}
func (c *Client) ListReplica(sdfsFileName string) {
// Step 1: Connect to the NameNode using TCP
nameNodeConn, err := net.Dial("tcp", c.NamenodeAddr)
if err != nil {
log.Println("CLIENT: LISTREPLICA :Fail to connect to namenode", err)
return
}
defer nameNodeConn.Close()
// Step 2: Send a PUT request to the NameNode
messageID := time.Now().Unix()
err = c.writeMessage(nameNodeConn, LISTREPLICA, sdfsFileName, messageID)
if err != nil {
log.Println("CLIENT: LISTREPLICA :Fail to send LISTREPLICA request to namenode", err)
return
}
// step 3: get a list of datanode from the server
message, err := c.readMessage(nameNodeConn)
if err != nil {
log.Println("CLIENT: LISTREPLICA : fail to read message from server", err)
return
}
assignment, err := payloadToAssignment(message.Payload)
if err != nil {
log.Println("CLIENT: LISTREPLICA : Fail to parse LISTREPLICA payload to assignment", err)
return
}
// step 4: print the assignment
for _, datanode := range assignment.Datanodes {
fmt.Print("IP:", datanode, "Hostname:")
hostnames, err := net.LookupAddr(datanode)
if err != nil {
log.Printf("CLIENT: LISTREPLICA : Look up address Error: %v\n", err)
return
}
fmt.Println(hostnames[0])
}
}
func (c *Client) writeMessage(conn net.Conn, msgType MsgType, payload interface{}, id int64) error {
message := Message{
ID: id,
Sender: c.ID,
Type: msgType,
Payload: payload,
}
request, err := json.Marshal(message)
if err != nil {
return err
}
conn.Write([]byte(request))
return nil
}
func (c *Client) readMessage(conn net.Conn) (Message, error) {
var message Message
buffer := make([]byte, 1024)
numBytes, err := conn.Read(buffer)
if err != nil {
return message, err
}
data := buffer[:numBytes]
// Decode the incoming data into a Message struct
err = json.Unmarshal(data, &message)
if err != nil {
return message, err
}
return message, nil
}
func (c *Client) readString(conn net.Conn) (string, error) {
var receivedString string
buffer := make([]byte, 1024)
numBytes, err := conn.Read(buffer)
if err != nil {
return receivedString, err
}
receivedString = string(buffer[:numBytes])
return receivedString, nil
}
func (c *Client) sendRequest(requestMessage, ip string) string {
var message string
addr := ip + ":" + c.Port
conn, err := net.Dial("tcp", addr)
if err != nil {
log.Printf("CLIENT: Fail to connect to ip %s: %v\n", ip, err)
return message
}
defer conn.Close()
_, err = fmt.Fprint(conn, requestMessage)
if err != nil {
log.Printf("CLIENT: Fail to send request to ip %s: %v\n", ip, err)
return message
}
message, err = c.readString(conn)
if err != nil {
log.Printf("CLIENT: Fail to read message from ip %s: %v\n", ip, err)
return message
}
return message
}
func parseIP(id string) string {
components := strings.Split(id, ":")
return components[1]
}
func getIP(hostname string) string {
ips, err := net.LookupIP(hostname)
if err != nil {
log.Println("CLIENT: Fail to resolve hostname:", err)
os.Exit(1)
}
return ips[0].String()
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment