From 0058503a308f91fecd6b09a9e69ceee546405065 Mon Sep 17 00:00:00 2001
From: ak85 <ak85@illinois.edu>
Date: Sun, 4 Dec 2022 23:54:21 -0600
Subject: [PATCH] implementation completed

---
 src/conf/conf.go                       |  4 +-
 src/mp4/cmd/coordinator/coordinator.go | 61 +++++++++++++++++++++++---
 src/mp4/cmd/worker/worker.go           | 12 ++---
 3 files changed, 60 insertions(+), 17 deletions(-)

diff --git a/src/conf/conf.go b/src/conf/conf.go
index 0de7a6c..91be28a 100644
--- a/src/conf/conf.go
+++ b/src/conf/conf.go
@@ -148,6 +148,6 @@ type TargetInfo struct {
 }
 
 var InferenceInput struct {
-	ModelName  string
-	InputBatch string
+	ModelName string
+	FileBatch []string
 }
diff --git a/src/mp4/cmd/coordinator/coordinator.go b/src/mp4/cmd/coordinator/coordinator.go
index 1902883..9088e2b 100644
--- a/src/mp4/cmd/coordinator/coordinator.go
+++ b/src/mp4/cmd/coordinator/coordinator.go
@@ -14,9 +14,6 @@ import (
 var Model1Rate int64
 var Model2Rate int64
 
-var TotalModel1Processed int64
-var TotalModel2Processed int64
-
 var Model1Targets []string
 var Model2Targets []string
 
@@ -36,6 +33,61 @@ func main() {
 		addFailedBatches()
 	}()
 
+	files := getQueryFiles()
+
+	for i:=0;i<len(files);i++ {
+		j:=0
+		batchQueue:=[]string{}
+        while j<batchSize {
+			batchQueue = append(batchQueue,files[i] )
+		}
+		Model1BatchQueue = append(Model1BatchQueue, batchQueue)
+		Model2BatchQueue = append(Model2BatchQueue, batchQueue)
+		batchQueue = []string{}
+		j=0
+	}
+
+	for {
+		request := batchSize + "_" + model
+		servAddr := Model1Targets[NextModel1Target] + ":8000"
+		tcpAddr, err := net.ResolveTCPAddr("tcp", servAddr)
+		if err != nil {
+			println("ResolveTCPAddr failed:", err.Error())
+			continue
+		}
+	
+		conn, err := net.DialTCP("tcp", nil, tcpAddr)
+		if err != nil {
+			println("Dial failed:", err.Error())
+			continue
+		}
+	
+		_, err = conn.Write([]byte(request))
+		if err != nil {
+			println("Write to server failed:", err.Error())
+			continue
+		}
+	
+		reply := make([]byte, 1024)
+	
+		n, err := conn.Read(reply)
+		if err != nil {
+			println("Write to server failed:", err.Error())
+			continue
+		}
+	
+		var status string
+		err = json.Unmarshal(reply[:n], &status)
+		if err != nil {
+			log.Println(err)
+		}
+	
+		conn.Close()
+		flag = false
+		NextModel1Target=NextModel1Target+1%len(Model1Targets)
+	}
+	
+
 	go func() {
 		for {
 			initialModel1Number := getModelOutputNumber(1)
@@ -130,9 +182,6 @@ func updateModelTargets(option string, modelNumber int) {
 	if option == "decrease" {
 
 		if modelNumber == 1 {
-			//for j := 0; j < len(Model1Targets)-1; j++ {
-			//	(Model1Targets)[j] = (Model1Targets)[j+1]
-			//}
 			Model1Targets = (Model1Targets)[:len(Model1Targets)-1]
 		}
 		if modelNumber == 2 {
diff --git a/src/mp4/cmd/worker/worker.go b/src/mp4/cmd/worker/worker.go
index 3362461..d2e45e2 100644
--- a/src/mp4/cmd/worker/worker.go
+++ b/src/mp4/cmd/worker/worker.go
@@ -11,12 +11,6 @@ import (
 
 func main() {
 
-	hostName, err := os.Hostname()
-
-	if err != nil {
-		log.Println(err)
-	}
-
 	go Server()
 
 }
@@ -29,9 +23,9 @@ func handleTCPConnection(conn net.Conn) {
 
 	var inferenceInput conf.InferenceInput
 	err = json.Unmarshal(buffer[:n], &inferenceInput)
-	if inferenceInput.ModelName == "ResNet" {
+	if inferenceInput.ModelName == "1" {
 		// model name and batch size
-		resultByte, err := json.Marshal(bat)
+		resultByte, err := json.Marshal(inferenceInput.FileBatch)
 		if err != nil {
 			fmt.Println(err)
 		}
@@ -40,7 +34,7 @@ func handleTCPConnection(conn net.Conn) {
 		_, err = conn.Write(message)
 	} else {
 		// model name and batch size
-		resultByte, err := json.Marshal(bat)
+		resultByte, err := json.Marshal(inferenceInput.FileBatch)
 		if err != nil {
 			fmt.Println(err)
 		}
-- 
GitLab