Skip to content
Snippets Groups Projects
main.go 6.40 KiB
package main

import (
	"bufio"
	"cs425mp4/maplejuice"
	"cs425mp4/sdfs"
	"cs425mp4/sql"
	"cs425mp4/utils"
	"fmt"
	"log"
	"os"
	"strconv"
	"strings"
)

func getLogfilename() string {
	// log file is named "machine.i.log"
	machineName, _ := os.Hostname()
	parts := strings.Split(machineName, ".")
	i := parts[0][len(parts[0])-2:]
	return fmt.Sprintf("machine.%s.log", i)
}

func isHadoop() bool {
	fmt.Println("Do you want to use hadoop?[y/n]")
	dpScanner := bufio.NewScanner(os.Stdin)
	for dpScanner.Scan() {
		input := dpScanner.Text()
		input = strings.TrimSpace(input)
		switch input {
		case "y":
			return true
		case "n":
			return false
		default:
			fmt.Println("Invalid input. Please enter 'y' or 'n'.")
		}
	}
	// Handle the case when the loop ends without returning
	fmt.Println("Invalid input. Defaulting to not using Hadoop.")
	return false
}

func waitHadoopCommand() {
	scanner := bufio.NewScanner(os.Stdin)

	for scanner.Scan() {
		input := scanner.Text()
		input = strings.TrimSpace(input)
		parts := strings.Fields(input)
		if len(parts) == 0 {
			continue
		}
		switch parts[0] {

		case "SELECT":
			var parser *sql.Parser
			if parts[4] == "WHERE" {
				parser = sql.InitParser(parts, "f", nil, nil)
			} else {
				parser = sql.InitParser(parts, "j", nil, nil)
			}
			if parser == nil {
				fmt.Println("Error: invalid sql")
				continue
			}
			e := parser.UploadDatasetToHDFS()
			if e != nil {
				fmt.Println("Error upload dataset to SDFS", e)
			} else {
				fmt.Println("Dataset uploaded successfully.")
			}
			t := utils.MeasureExecutionTime(func() {
				e = parser.RunQueryUsingHadoop()
				if e != nil {
					fmt.Println("Error parse sql:", e)
				} else {
					fmt.Println("sql parsed successfully.")
				}
			})
			fmt.Println("Execution time:", t)
		default:
			log.Println("Invalid Command:", input)
		}
	}

}

func main() {
	logfilename := getLogfilename()
	logFile, err := os.Create(logfilename)
	if err != nil {
		fmt.Println("Failed to open log file:", err)
		return
	}
	defer logFile.Close()
	// log.SetOutput(logFile)

	fmt.Println("Type 'SELECT ALL FROM Dataset WHERE <regex condition>'")
	fmt.Println("Type 'SELECT ALL FROM D1, D2 WHERE <one specific field’s value in a line of D1 = one specific field’s value in a line of D2>'")

	if isHadoop() {
		for {
			waitHadoopCommand()
		}
	}

	hostname, _ := os.Hostname()
	ip := utils.GetIP(hostname).String()
	id := hostname + ":" + ip + ":" + "5555" + ":" + "0000"
	leader := "fa23-cs425-0201.cs.illinois.edu"
	sdfs_client := sdfs.InitializeClient(
		id,
		"5555",
		leader+":5002", // leader
	)
	// init maplejuice
	am := maplejuice.NewApplicationManager(ip, leader)

	// start listening for messages
	if hostname != "fa23-cs425-0201.cs.illinois.edu" {
		nm := maplejuice.NewNodeManager(ip, leader, sdfs_client)
		go nm.Run()
	} else {
		// Start the resource manager
		rm := maplejuice.NewResourceManager(ip, sdfs_client)
		go rm.Run()
	}

	fmt.Println("Type 'maple <maple_exe> <num_maples> <sdfs_intermediate_filename_prefix> <sdfs_src_directory>'")
	fmt.Println("Type 'juice <juice_exe> <num_juices> <sdfs_intermediate_filename_prefix> <sdfs_dest_filename> delete_input={0,1}'")
	var parser *sql.Parser
	scanner := bufio.NewScanner(os.Stdin)
	for scanner.Scan() {
		input := scanner.Text()
		input = strings.TrimSpace(input)
		parts := strings.Fields(input)
		if len(parts) == 0 {
			continue
		}
		switch parts[0] {
		case "maple":
			if len(parts) >= 5 {
				maple_exe := parts[1]
				num_maples, _ := strconv.Atoi(parts[2])
				sdfs_intermediate_filename_prefix := parts[3]
				sdfs_src_directory := parts[4]
				args := parts[5:]
				fmt.Printf("Processing 'maple' command with maple_exe: %s, num_maples: %d, sdfs_intermediate_filename_prefix: %s, sdfs_src_directory: %s\n", maple_exe, num_maples, sdfs_intermediate_filename_prefix, sdfs_src_directory)

				job := maplejuice.Job{
					Type:                   maplejuice.MAPLE,
					Executable:             maple_exe,
					NumTasks:               num_maples,
					IntermediateFilePrefix: sdfs_intermediate_filename_prefix,
					SrcDir:                 sdfs_src_directory,
					Args:                   args,
				}

				err := am.SubmitJob(job)
				if err != nil {
					fmt.Println("Error submitting job:", err)
				} else {
					fmt.Println("Job completed successfully.")
				}

			} else {
				fmt.Println("Invalid 'maple' command. Usage: 'maple <maple_exe> <num_maples> <sdfs_intermediate_filename_prefix> <sdfs_src_directory>'")
			}

		case "juice":
			if len(parts) >= 7 {
				juice_exe := parts[1]
				num_juices, _ := strconv.Atoi(parts[2])
				sdfs_intermediate_filename_prefix := parts[3]
				sdfs_dest_filename := parts[4]
				delete_input := true
				if parts[5] == "0" {
					delete_input = false
				}
				partitionMethod := parts[6]
				args := parts[7:]

				fmt.Printf("Processing 'juice' command with juice_exe: %s, num_juices: %d, sdfs_intermediate_filename_prefix: %s, sdfs_dest_filename: %s, delete_input: %v\n", juice_exe, num_juices, sdfs_intermediate_filename_prefix, sdfs_dest_filename, delete_input)

				job := maplejuice.Job{
					Type:                   maplejuice.JUICE,
					Executable:             juice_exe,
					NumTasks:               num_juices,
					IntermediateFilePrefix: sdfs_intermediate_filename_prefix,
					DestFile:               sdfs_dest_filename,
					DeleteFlag:             delete_input,
					PartitionMethod:        partitionMethod,
					Args:                   args,
				}
				err := am.SubmitJob(job)
				if err != nil {
					fmt.Println("Error submitting job:", err)
				} else {
					fmt.Println("Job completed successfully.")
				}

			} else {
				fmt.Println("Invalid 'juice' command. Usage: 'juice <juice_exe> <num_juices> <sdfs_intermediate_filename_prefix> <sdfs_dest_filename> delete_input={0,1}'")
			}
		case "SELECT":
			parser = nil
			if parts[4] == "WHERE" {
				parser = sql.InitParser(parts, "f", sdfs_client, am)
			} else {
				parser = sql.InitParser(parts, "j", sdfs_client, am)
			}
			if parser == nil {
				fmt.Println("Error: invalid sql")
				continue
			}

			e := parser.UploadDataset()
			if e != nil {
				fmt.Println("Error upload dataset to SDFS", e)
			} else {
				fmt.Println("Dataset uploaded successfully.")
			}
			t := utils.MeasureExecutionTime(func() {
				e = parser.RunQuery()
				if e != nil {
					fmt.Println("Error parse sql:", e)
				} else {
					fmt.Println("sql parsed successfully.")
				}
			})
			fmt.Println("Execution time:", t)

		default:
			log.Println("Invalid Command:", input)

		}
	}
}