diff --git a/src/conf/conf.go b/src/conf/conf.go index 0de7a6c0386b4962d8e4a6fcb3f47bf339733a4d..91be28af640d67adf01b44032f4c478810939e5c 100644 --- a/src/conf/conf.go +++ b/src/conf/conf.go @@ -148,6 +148,6 @@ type TargetInfo struct { } var InferenceInput struct { - ModelName string - InputBatch string + ModelName string + FileBatch []string } diff --git a/src/mp4/cmd/coordinator/coordinator.go b/src/mp4/cmd/coordinator/coordinator.go index 19028837cb2d72c624b75b8b52b8b464e3b6a4a2..9088e2b738ad0e49dddcdf0d32f4a91ff73d2009 100644 --- a/src/mp4/cmd/coordinator/coordinator.go +++ b/src/mp4/cmd/coordinator/coordinator.go @@ -14,9 +14,6 @@ import ( var Model1Rate int64 var Model2Rate int64 -var TotalModel1Processed int64 -var TotalModel2Processed int64 - var Model1Targets []string var Model2Targets []string @@ -36,6 +33,61 @@ func main() { addFailedBatches() }() + files := getQueryFiles() + + for i:=0;i<len(files);i++ { + j:=0 + batchQueue:=[]string{} + while j<batchSize { + batchQueue = append(batchQueue,files[i] ) + } + Model1BatchQueue = append(Model1BatchQueue, batchQueue) + Model2BatchQueue = append(Model2BatchQueue, batchQueue) + batchQueue = []string{} + j=0 + } + + for { + request := batchSize + "_" + model + servAddr := Model1Targets[NextModel1Target] + ":8000" + tcpAddr, err := net.ResolveTCPAddr("tcp", servAddr) + if err != nil { + println("ResolveTCPAddr failed:", err.Error()) + continue + } + + conn, err := net.DialTCP("tcp", nil, tcpAddr) + if err != nil { + println("Dial failed:", err.Error()) + continue + } + + _, err = conn.Write([]byte(request)) + if err != nil { + println("Write to server failed:", err.Error()) + continue + } + + reply := make([]byte, 1024) + + n, err := conn.Read(reply) + if err != nil { + println("Write to server failed:", err.Error()) + continue + } + + var status string + err = json.Unmarshal(reply[:n], &status) + if err != nil { + log.Println(err) + } + + conn.Close() + flag = false + NextModel1Target=NextModel1Target+1%len(Model1Targets) + } + + go func() { for { initialModel1Number := getModelOutputNumber(1) @@ -130,9 +182,6 @@ func updateModelTargets(option string, modelNumber int) { if option == "decrease" { if modelNumber == 1 { - //for j := 0; j < len(Model1Targets)-1; j++ { - // (Model1Targets)[j] = (Model1Targets)[j+1] - //} Model1Targets = (Model1Targets)[:len(Model1Targets)-1] } if modelNumber == 2 { diff --git a/src/mp4/cmd/worker/worker.go b/src/mp4/cmd/worker/worker.go index 33624611dec995945c6a659d40ccb643fa6de276..d2e45e2f62b4e56c0327bb2eb0e7e1c16781f510 100644 --- a/src/mp4/cmd/worker/worker.go +++ b/src/mp4/cmd/worker/worker.go @@ -11,12 +11,6 @@ import ( func main() { - hostName, err := os.Hostname() - - if err != nil { - log.Println(err) - } - go Server() } @@ -29,9 +23,9 @@ func handleTCPConnection(conn net.Conn) { var inferenceInput conf.InferenceInput err = json.Unmarshal(buffer[:n], &inferenceInput) - if inferenceInput.ModelName == "ResNet" { + if inferenceInput.ModelName == "1" { // model name and batch size - resultByte, err := json.Marshal(bat) + resultByte, err := json.Marshal(inferenceInput.FileBatch) if err != nil { fmt.Println(err) } @@ -40,7 +34,7 @@ func handleTCPConnection(conn net.Conn) { _, err = conn.Write(message) } else { // model name and batch size - resultByte, err := json.Marshal(bat) + resultByte, err := json.Marshal(inferenceInput.FileBatch) if err != nil { fmt.Println(err) }