Skip to content
Snippets Groups Projects
Commit d0c28e73 authored by Tingyin Ding's avatar Tingyin Ding
Browse files

Alternative design: send result with scp instead of message

parent e6c548ab
No related branches found
No related tags found
No related merge requests found
......@@ -72,28 +72,30 @@ func (nm *NodeManager) handleMessage(receivedMessage Message) {
switch messageType {
case ASSIGNM:
nm.maple(taskPayload, sender)
// nm.maple(taskPayload, sender)
nm.processTask(taskPayload, sender, RESULTM)
case ASSIGNJ:
nm.juice(taskPayload, sender)
// nm.juice(taskPayload, sender)
nm.processTask(taskPayload, sender, RESULTJ)
default:
log.Println("nodeManager: wrong message type")
}
}
func (nm *NodeManager) maple(task Task, sender string) {
func (nm *NodeManager) processTask(task Task, sender string, t MsgType) {
// cleanupIntermediate()
// Implement logic to run maple task on the node
log.Println("nodeManager: Executing Maple Task on Node", task)
// Create a slice to store results for each line
results := make(map[string][]string)
log.Println("nodeManager: Executing Task on Node", task)
resultFile := IntermediateFileDir + task.ID + ".result"
status := FAILED
defer func() {
// Send the TaskResult to the leader
taskResult := TaskResult{
TaskID: task.ID,
Results: results,
Status: status,
TaskID: task.ID,
ResultFile: resultFile,
Status: status,
}
err := nm.sendTaskResultToLeader(sender, RESULTM, taskResult)
if err != nil {
......@@ -103,7 +105,6 @@ func (nm *NodeManager) maple(task Task, sender string) {
log.Printf("nodeManager: Sent TaskResult to leader taskid: %s status: %v\n", task.ID, status)
}()
// Appoarch 1: get from sdfs with filename
// Iterate over each line in the Data field
for _, file := range task.Data {
// Execute maple_exe for each line
......@@ -111,123 +112,17 @@ func (nm *NodeManager) maple(task Task, sender string) {
log.Print("nodeManager: local task file", file)
// localfilename := IntermediateFileDir + file
// nm.sdfs.Get(file, localfilename)
output, err := executeExe(task.Executable, localfilename, task.Args)
_, err := executeExe(task.Executable, localfilename, resultFile, task.Args)
if err != nil {
// Handle the error (you may choose to continue or break the loop)
log.Printf("nodeManager: Error executing maple_exe: %v\n", err)
return
}
sendTaskFile(nm.ResourceManagerAddress, resultFile)
// Append the result to the results slice
results = parseOutput(results, output)
log.Println("nodeManager: current result length:", len(results))
}
// Appoarch 2: read the received partitions
// // Iterate over each line in the Data field
// localfilename := "data/maplejuice/intermediate/intertemp"
// // Open or create the local file for appending data
// localFile, err := os.OpenFile(localfilename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
// if err != nil {
// log.Fatalf("Failed to open or create local file: %v", err)
// }
// defer localFile.Close()
// // Iterate over each chunk and append it to the local file
// for _, chunk := range task.Data {
// // Append the chunk to the local file
// _, err := localFile.WriteString(chunk + "\n") // Adding a newline for separation
// if err != nil {
// log.Fatalf("Failed to write chunk to local file: %v", err)
// }
// }
// output, err := executeExe(task.Executable, localfilename, task.Args)
// if err != nil {
// // Handle the error (you may choose to continue or break the loop)
// log.Printf("nodeManager: Error executing maple_exe: %v\n", err)
// return
// }
// // Append the result to the results slice
// results = parseOutput(results, output)
// log.Println("nodeManager: current result length:", len(results))
// Create a TaskResult with the collected results
status = COMPLETED
}
func (nm *NodeManager) juice(task Task, sender string) {
// cleanupIntermediate()
// Implement logic to run juice task on the node
log.Println("Executing Juice Task on Node")
// Create a slice to store results for each line
results := make(map[string][]string)
status := FAILED
defer func() {
// Send the TaskResult to the leader
taskResult := TaskResult{
TaskID: task.ID,
Results: results,
Status: status,
}
err := nm.sendTaskResultToLeader(sender, RESULTJ, taskResult)
if err != nil {
log.Printf("nodeManager: Error sending TaskResult to leader: %v\n", err)
}
log.Println("nodeManager: Sent TaskResult to leader", task.ID, status)
}()
// Appoarch 1: Iterate over each line in the Data field
for _, file := range task.Data {
localfilename := file
// err := fetchfile(ResourceManagerAddress, file)
log.Print("nodeManager: local task file", file)
// localfilename := IntermediateFileDir + file
// nm.sdfs.Get(file, localfilename)
// Execute maple_exe for each line
output, err := executeExe(task.Executable, localfilename, task.Args)
if err != nil {
// Handle the error (you may choose to continue or break the loop)
log.Printf("nodeManager: Error executing juice_exe: %v\n", err)
return
}
// Append the result to the results slice
results = parseOutput(results, output)
log.Println("nodeManager: current result length:", len(results))
}
// Appoarch 2: read the received partitions
// // Iterate over each line in the Data field
// localfilename := "data/maplejuice/intermediate/intertempjuice"
// // Open or create the local file for appending data
// localFile, err := os.OpenFile(localfilename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
// if err != nil {
// log.Fatalf("Failed to open or create local file: %v", err)
// }
// defer localFile.Close()
// // Iterate over each chunk and append it to the local file
// for _, line := range task.Data {
// // Append the chunk to the local file
// _, err := localFile.WriteString(line + "\n")
// if err != nil {
// log.Fatalf("Failed to write chunk to local file: %v", err)
// }
// }
// output, err := executeExe(task.Executable, localfilename, task.Args)
// if err != nil {
// // Handle the error (you may choose to continue or break the loop)
// log.Printf("nodeManager: Error executing juice_exe: %v\n", err)
// return
// }
// // Append the result to the results slice
// results = parseOutput(results, output)
// log.Println("nodeManager: current result length:", len(results))
// Create a TaskResult with the collected results
status = COMPLETED
}
......@@ -255,7 +150,7 @@ func parseOutput(results map[string][]string, output string) map[string][]string
}
// executeExe runs the specified executable with input from a file and returns the output.
func executeExe(executable, input string, args []string) (string, error) {
func executeExe(executable, input, outputfile string, args []string) (string, error) {
// Construct the path to the executable file
executablePath := "data/maplejuice/executable/" + executable
......@@ -273,7 +168,7 @@ func executeExe(executable, input string, args []string) (string, error) {
// Run the executable with the input file as stdin
joinedArgs := strings.Join(args, " ")
cmd = exec.Command("sh", "-c", executablePath+" "+joinedArgs+" < "+input)
cmd = exec.Command("sh", "-c", executablePath+" "+joinedArgs+" < "+input+"1>"+outputfile)
log.Println("nodeManager: Executing command:", cmd.String())
// Execute the command and capture its output
......@@ -308,19 +203,18 @@ func (nm *NodeManager) sendTaskResultToLeader(sender string, t MsgType, taskResu
return nil
}
// func (nm *NodeManager) fetchTaskFile(filePath string) error {
// // Get the file from SDFS
// // err := nm.sdfs.Get(filename, filename)
// log.Println("fetching task from ResourceManager", nm.ResourceManagerAddress)
// destination := fmt.Sprintf("%s:cs425mp4/data/maplejuice/executable/", nm.ResourceManagerAddress)
// log.Println("scp", filePath, destination)
// cmd := exec.Command("scp", filePath, destination)
func sendResultFile(ResourceManager, filePath string) error {
log.Println("Sending taskfile to resource manager", ResourceManager)
destination := fmt.Sprintf("%s:cs425mp4/"+IntermediateFileDir, ResourceManager)
cmd := exec.Command("scp", filePath, destination)
log.Println("scp", cmd.String())
// err := cmd.Run()
// if err != nil {
// log.Printf("nodeManager: Error fetching file %s from SDFS: %v\n", filePath, err)
// return err
// }
// return nil
err := cmd.Run()
if err != nil {
return fmt.Errorf("scp command failed: %w", err)
}
log.Println("scp command success")
// }
return nil
}
......@@ -14,6 +14,7 @@ import (
"cs425mp4/utils"
"fmt"
"hash/fnv"
"io"
"log"
"net"
"os"
......@@ -195,21 +196,52 @@ func (rm *ResourceManager) handleMapleResult(message Message, conn net.Conn) {
log.Println(rm.logFlag, "handleMapleResult", "Task completed", taskResult.TaskID, rm.tasks[taskResult.TaskID].Status)
// if completed, append result to data/maplejuice/intermediate/<job.FilePrefix>_<key>
for key, values := range taskResult.Results {
// Open the taskResult file for reading
resultFile, err := os.Open(taskResult.ResultFile)
if err != nil {
log.Println(rm.logFlag, "handleMapleResult", "Fail to open task result file", taskResult.ResultFile, err)
rm.tasks[taskResult.TaskID].Status = FAILED
return
}
defer resultFile.Close()
scanner := bufio.NewScanner(resultFile)
for scanner.Scan() {
line := scanner.Text()
parts := strings.SplitN(line, "\t", 2) // Split the line into key and value
if len(parts) != 2 {
rm.tasks[taskResult.TaskID].Status = FAILED
log.Println(rm.logFlag, "handleMapleResult", "Invalid line format", line)
continue
}
key := parts[0]
modkey := genIntermediateFileKey(key)
filename := IntermediateFileDir + rm.currentJob.IntermediateFilePrefix + "_" + modkey
file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
// log.Println(rm.logFlag, "handleMapleResult", "Writing to file", filename)
rm.tasks[taskResult.TaskID].Status = FAILED
log.Println(rm.logFlag, "handleMapleResult", "Fail to open file", filename, err)
file.Close()
continue
}
for _, value := range values {
file.WriteString(value + "\n")
_, err = file.WriteString(line + "\n")
if err != nil {
log.Println(rm.logFlag, "handleMapleResult", "Error writing to file", filename, err)
}
file.Close()
}
log.Println(rm.logFlag, "handleMapleResult", "All results written to local files")
if err := scanner.Err(); err != nil {
rm.tasks[taskResult.TaskID].Status = FAILED
log.Println(rm.logFlag, "handleMapleResult", "Error reading task result file", taskResult.ResultFile, err)
}
}
......@@ -243,18 +275,29 @@ func (rm *ResourceManager) handleJuiceResult(message Message, conn net.Conn) {
filename := IntermediateFileDir + rm.currentJob.DestFile
file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Println(rm.logFlag, "handleJuiceResult", "Fail to open file", err)
log.Println(rm.logFlag, "handleJuiceResult", "Fail to open destination file", err)
rm.tasks[taskResult.TaskID].Status = FAILED
return
}
defer file.Close()
// the value is key value
for _, values := range taskResult.Results {
for _, value := range values {
file.WriteString(value + "\n")
}
// Open the source file for reading
resultFile, err := os.Open(taskResult.ResultFile)
if err != nil {
log.Println(rm.logFlag, "handleJuiceResult", "Fail to open source file", err)
rm.tasks[taskResult.TaskID].Status = FAILED
return
}
defer resultFile.Close()
// Copy the contents of the result file to the destination file
_, err = io.Copy(file, resultFile)
if err != nil {
log.Println(rm.logFlag, "handleJuiceResult", "Error appending result file", err)
rm.tasks[taskResult.TaskID].Status = FAILED
return
}
}
func (rm *ResourceManager) SubmitJob(job Job) error {
......
......@@ -63,9 +63,9 @@ type Task struct {
}
type TaskResult struct {
TaskID string
Status TaskStatus
Results map[string][]string
TaskID string
Status TaskStatus
ResultFile string
}
type Message struct {
......@@ -258,7 +258,6 @@ func PayloadToTask(payload interface{}) (Task, error) {
func PayloadToTaskResult(payload interface{}) (TaskResult, error) {
taskResult := TaskResult{}
taskResult.Results = make(map[string][]string)
data, ok := payload.(map[string]interface{})
if !ok {
......@@ -273,20 +272,8 @@ func PayloadToTaskResult(payload interface{}) (TaskResult, error) {
taskResult.Status = TaskStatus(status)
}
if results, ok := data["Results"].(map[string]interface{}); ok {
for key, value := range results {
if str, ok := value.([]interface{}); ok {
for _, s := range str {
if str, ok := s.(string); ok {
taskResult.Results[key] = append(taskResult.Results[key], str)
} else {
return taskResult, fmt.Errorf("Invalid value in 'Results' field")
}
}
} else {
return taskResult, fmt.Errorf("Invalid value in 'Results' field")
}
}
if results, ok := data["ResultFile"].(string); ok {
taskResult.ResultFile = results
} else {
return taskResult, fmt.Errorf("Missing or invalid 'Results' field")
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment