Skip to content
Snippets Groups Projects
Commit 09954e14 authored by Carol7102's avatar Carol7102
Browse files

add hadoop mod and update sql parser

parent d1f596dc
No related branches found
No related tags found
No related merge requests found
......@@ -13,11 +13,14 @@ replace cs425mp4/maplejuice => ./maplejuice
replace cs425mp4/utils => ./utils
require (
cs425mp4/hadoop v0.0.0-00010101000000-000000000000
cs425mp4/maplejuice v0.0.0-00010101000000-000000000000
cs425mp4/sdfs v0.0.0-00010101000000-000000000000
cs425mp4/utils v0.0.0-00010101000000-000000000000
)
require cs425mp4/sql v0.0.0-00010101000000-000000000000 // indirect
require cs425mp4/sql v0.0.0-00010101000000-000000000000
replace cs425mp4/sql => ./sql
replace cs425mp4/hadoop => ./hadoop
module cs425mp4/hadoop
go 1.21.0
package hadoop
import (
"fmt"
)
type Hadoop struct {
args []string
}
func InitHP(args []string) *Hadoop {
hadoop := new(Hadoop)
hadoop.args = args
return hadoop
}
func (hp *Hadoop) RunHadoop() error {
fmt.Println("hadoop: running queries in Hadoop...")
return nil
}
......@@ -6,6 +6,7 @@ import (
"cs425mp4/sdfs"
"cs425mp4/sql"
"cs425mp4/utils"
"cs425mp4/hadoop"
"fmt"
"log"
"os"
......@@ -21,6 +22,46 @@ func getLogfilename() string {
return fmt.Sprintf("machine.%s.log", i)
}
func isHadoop() bool {
fmt.Println("Do you want to use hadoop?[y/n]")
dpScanner := bufio.NewScanner(os.Stdin)
for dpScanner.Scan() {
input := dpScanner.Text()
input = strings.TrimSpace(input)
switch input {
case "y":
return true
case "n":
return false
default:
fmt.Println("Invalid input. Please enter 'y' or 'n'.")
}
}
// Handle the case when the loop ends without returning
fmt.Println("Invalid input. Defaulting to not using Hadoop.")
return false
}
func getHadoopArgs() []string {
fmt.Println("Type hadoop arguments '<haddopStreamingJar> <input> <inputFile> <outputDir> <mapper> <reducer>'")
dpScanner2 := bufio.NewScanner(os.Stdin)
for dpScanner2.Scan() {
input := dpScanner2.Text()
input = strings.TrimSpace(input)
parts := strings.Fields(input)
if len(parts) != 6 {
fmt.Println("Invalid 'maple' command. Usage: '<haddopStreamingJar> <input> <inputFile> <outputDir> <mapper> <reducer>'")
return nil
}
return parts
}
// Handle the case when the loop ends without returning
fmt.Println("Invalid input. Return with no arguments")
return nil
}
func main() {
logfilename := getLogfilename()
logFile, err := os.Create(logfilename)
......@@ -128,7 +169,7 @@ func main() {
} else {
fmt.Println("Invalid 'juice' command. Usage: 'juice <juice_exe> <num_juices> <sdfs_intermediate_filename_prefix> <sdfs_dest_filename> delete_input={0,1}'")
}
case "SELECT":
case "SELECT":
if parts[4] == "WHERE" {
// Filter SQL
// get a sql parser
......@@ -138,13 +179,23 @@ func main() {
break
}
e := parser.ParseSQL()
if e != nil {
fmt.Println("Error parse sql:", e)
if isHadoop() {
args := getHadoopArgs()
hp := hadoop.InitHP(args)
if args != nil {
hp.RunHadoop()
} else {
fmt.Println("Wrong hadoop arguments")
}
} else {
fmt.Println("sql parsed successfully.")
e := parser.ParseSQL()
if e != nil {
fmt.Println("Error parse sql:", e)
} else {
fmt.Println("sql parsed successfully.")
}
}
} else {
// Join SQL
// get a sql parser
......@@ -153,13 +204,22 @@ func main() {
fmt.Println("Error: invalid sql")
break
}
e := parser.ParseSQL()
if e != nil {
fmt.Println("Error parse sql:", e)
if isHadoop() {
args := getHadoopArgs()
hp := hadoop.InitHP(args)
if args != nil {
hp.RunHadoop()
} else {
fmt.Println("Wrong hadoop arguments")
}
} else {
fmt.Println("sql parsed successfully.")
e := parser.ParseSQL()
if e != nil {
fmt.Println("Error parse sql:", e)
} else {
fmt.Println("sql parsed successfully.")
}
}
}
default:
......
This diff is collapsed.
#!/usr/bin/env python3
import sys
import re
def is_valid_line(value, pattern):
pattern = re.compile(pattern) # Correct way to create a regex object
return pattern.match(value) is not None
def map_function(index, regex):
for line in sys.stdin:
line = line.strip()
parts = line.split()
if is_valid_line(parts[index], regex):
print(f'{line}\t1')
if __name__ == '__main__':
index = int(sys.argv[1])
regex = sys.argv[2]
map_function(index, regex)
#!/usr/bin/env python3
import sys
def reduce_function():
for line in sys.stdin:
line = line.strip()
line, value = line.rsplit(' ', 1)
print(line+'\t')
if __name__ == '__main__':
reduce_function()
#!/usr/bin/env python3
import sys
import re
def map_function(index, ds):
for line in sys.stdin:
line = line.strip()
try:
index = int(index)
except ValueError:
print("Error: The index argument must be a valid integer.")
sys.exit(1)
parts = line.split()
idx = index
dataset = ds
fieldValue = parts[idx]
del parts[idx]
new_line = '\t'.join(parts)
new_line += '\t' + dataset
print(fieldValue+'\t'+new_line)
if __name__ == '__main__':
idx = int(sys.argv[1])
ds = sys.argv[2]
map_function(idx, ds)
#!/usr/bin/env python3
import sys
def map_function():
for line in sys.stdin:
line = line.strip()
key, value = line.split('\t', 1)
print(key+'\t'+value)
if __name__ == '__main__':
map_function()
\ No newline at end of file
#!/usr/bin/env python3
import sys
def reduce_function():
for line in sys.stdin:
line = line.strip()
key, value = line.split('\t', 1)
print(key+'\t'+value)
if __name__ == '__main__':
reduce_function()
#!/usr/bin/env python3
import sys
def print_function(current_map, current_key):
# Perform a Cartesian product join
joined_lines = []
keys = list(current_map.keys())
if len(keys) < 2:
return
for line_d1 in current_map[keys[0]]:
for line_d2 in current_map[keys[1]]:
joined_line = f"{line_d1}\t{line_d2}"
joined_lines.append(joined_line)
# Print or use the joined lines
for line in joined_lines:
line = current_key + '\t' + line
print(line + '\t')
def reduce_function():
current_key = None
current_map = None
key = None
for line in sys.stdin:
line = line.strip()
key, value = line.split('\t', 1)
values = value.split()
ds = values.pop(-1)
value = '\t'.join(values)
if current_key == key:
if ds in current_map:
current_map[ds].append(value)
else:
current_map[ds] = [value]
else:
if current_map:
print_function(current_map, current_key)
current_map = {}
current_map[ds] = [value]
current_key = key
if current_key == key:
print_function(current_map, current_key)
if __name__ == '__main__':
reduce_function()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment