-
Tingyin Ding authoredTingyin Ding authored
main.go 6.40 KiB
package main
import (
"bufio"
"cs425mp4/maplejuice"
"cs425mp4/sdfs"
"cs425mp4/sql"
"cs425mp4/utils"
"fmt"
"log"
"os"
"strconv"
"strings"
)
func getLogfilename() string {
// log file is named "machine.i.log"
machineName, _ := os.Hostname()
parts := strings.Split(machineName, ".")
i := parts[0][len(parts[0])-2:]
return fmt.Sprintf("machine.%s.log", i)
}
func isHadoop() bool {
fmt.Println("Do you want to use hadoop?[y/n]")
dpScanner := bufio.NewScanner(os.Stdin)
for dpScanner.Scan() {
input := dpScanner.Text()
input = strings.TrimSpace(input)
switch input {
case "y":
return true
case "n":
return false
default:
fmt.Println("Invalid input. Please enter 'y' or 'n'.")
}
}
// Handle the case when the loop ends without returning
fmt.Println("Invalid input. Defaulting to not using Hadoop.")
return false
}
func waitHadoopCommand() {
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
input := scanner.Text()
input = strings.TrimSpace(input)
parts := strings.Fields(input)
if len(parts) == 0 {
continue
}
switch parts[0] {
case "SELECT":
var parser *sql.Parser
if parts[4] == "WHERE" {
parser = sql.InitParser(parts, "f", nil, nil)
} else {
parser = sql.InitParser(parts, "j", nil, nil)
}
if parser == nil {
fmt.Println("Error: invalid sql")
continue
}
e := parser.UploadDatasetToHDFS()
if e != nil {
fmt.Println("Error upload dataset to SDFS", e)
} else {
fmt.Println("Dataset uploaded successfully.")
}
t := utils.MeasureExecutionTime(func() {
e = parser.RunQueryUsingHadoop()
if e != nil {
fmt.Println("Error parse sql:", e)
} else {
fmt.Println("sql parsed successfully.")
}
})
fmt.Println("Execution time:", t)
default:
log.Println("Invalid Command:", input)
}
}
}
func main() {
logfilename := getLogfilename()
logFile, err := os.Create(logfilename)
if err != nil {
fmt.Println("Failed to open log file:", err)
return
}
defer logFile.Close()
// log.SetOutput(logFile)
fmt.Println("Type 'SELECT ALL FROM Dataset WHERE <regex condition>'")
fmt.Println("Type 'SELECT ALL FROM D1, D2 WHERE <one specific field’s value in a line of D1 = one specific field’s value in a line of D2>'")
if isHadoop() {
for {
waitHadoopCommand()
}
}
hostname, _ := os.Hostname()
ip := utils.GetIP(hostname).String()
id := hostname + ":" + ip + ":" + "5555" + ":" + "0000"
leader := "fa23-cs425-0201.cs.illinois.edu"
sdfs_client := sdfs.InitializeClient(
id,
"5555",
leader+":5002", // leader
)
// init maplejuice
am := maplejuice.NewApplicationManager(ip, leader)
// start listening for messages
if hostname != "fa23-cs425-0201.cs.illinois.edu" {
nm := maplejuice.NewNodeManager(ip, leader, sdfs_client)
go nm.Run()
} else {
// Start the resource manager
rm := maplejuice.NewResourceManager(ip, sdfs_client)
go rm.Run()
}
fmt.Println("Type 'maple <maple_exe> <num_maples> <sdfs_intermediate_filename_prefix> <sdfs_src_directory>'")
fmt.Println("Type 'juice <juice_exe> <num_juices> <sdfs_intermediate_filename_prefix> <sdfs_dest_filename> delete_input={0,1}'")
var parser *sql.Parser
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
input := scanner.Text()
input = strings.TrimSpace(input)
parts := strings.Fields(input)
if len(parts) == 0 {
continue
}
switch parts[0] {
case "maple":
if len(parts) >= 5 {
maple_exe := parts[1]
num_maples, _ := strconv.Atoi(parts[2])
sdfs_intermediate_filename_prefix := parts[3]
sdfs_src_directory := parts[4]
args := parts[5:]
fmt.Printf("Processing 'maple' command with maple_exe: %s, num_maples: %d, sdfs_intermediate_filename_prefix: %s, sdfs_src_directory: %s\n", maple_exe, num_maples, sdfs_intermediate_filename_prefix, sdfs_src_directory)
job := maplejuice.Job{
Type: maplejuice.MAPLE,
Executable: maple_exe,
NumTasks: num_maples,
IntermediateFilePrefix: sdfs_intermediate_filename_prefix,
SrcDir: sdfs_src_directory,
Args: args,
}
err := am.SubmitJob(job)
if err != nil {
fmt.Println("Error submitting job:", err)
} else {
fmt.Println("Job completed successfully.")
}
} else {
fmt.Println("Invalid 'maple' command. Usage: 'maple <maple_exe> <num_maples> <sdfs_intermediate_filename_prefix> <sdfs_src_directory>'")
}
case "juice":
if len(parts) >= 7 {
juice_exe := parts[1]
num_juices, _ := strconv.Atoi(parts[2])
sdfs_intermediate_filename_prefix := parts[3]
sdfs_dest_filename := parts[4]
delete_input := true
if parts[5] == "0" {
delete_input = false
}
partitionMethod := parts[6]
args := parts[7:]
fmt.Printf("Processing 'juice' command with juice_exe: %s, num_juices: %d, sdfs_intermediate_filename_prefix: %s, sdfs_dest_filename: %s, delete_input: %v\n", juice_exe, num_juices, sdfs_intermediate_filename_prefix, sdfs_dest_filename, delete_input)
job := maplejuice.Job{
Type: maplejuice.JUICE,
Executable: juice_exe,
NumTasks: num_juices,
IntermediateFilePrefix: sdfs_intermediate_filename_prefix,
DestFile: sdfs_dest_filename,
DeleteFlag: delete_input,
PartitionMethod: partitionMethod,
Args: args,
}
err := am.SubmitJob(job)
if err != nil {
fmt.Println("Error submitting job:", err)
} else {
fmt.Println("Job completed successfully.")
}
} else {
fmt.Println("Invalid 'juice' command. Usage: 'juice <juice_exe> <num_juices> <sdfs_intermediate_filename_prefix> <sdfs_dest_filename> delete_input={0,1}'")
}
case "SELECT":
parser = nil
if parts[4] == "WHERE" {
parser = sql.InitParser(parts, "f", sdfs_client, am)
} else {
parser = sql.InitParser(parts, "j", sdfs_client, am)
}
if parser == nil {
fmt.Println("Error: invalid sql")
continue
}
e := parser.UploadDataset()
if e != nil {
fmt.Println("Error upload dataset to SDFS", e)
} else {
fmt.Println("Dataset uploaded successfully.")
}
t := utils.MeasureExecutionTime(func() {
e = parser.RunQuery()
if e != nil {
fmt.Println("Error parse sql:", e)
} else {
fmt.Println("sql parsed successfully.")
}
})
fmt.Println("Execution time:", t)
default:
log.Println("Invalid Command:", input)
}
}
}