Skip to content
Snippets Groups Projects
Commit e1d4551c authored by zf4's avatar zf4
Browse files

worker data conn map

parent 92916998
No related branches found
No related tags found
No related merge requests found
......@@ -35,6 +35,7 @@ var jobFinished bool // true if a job is just finished
var boltID int // unique id for each bolt
var possibleServerIdx int // index of last chosen server
var workerConn map[int]net.Conn // map worker servNum to its corresponding connection
var workerDataConn map[int]net.Conn
var workerPort int
var needRestart bool
......@@ -72,6 +73,7 @@ func InitializeCraneMaster() {
boltID = 1 // 0 for spout, i.e., master node
possibleServerIdx = 0
workerConn = make(map[int]net.Conn)
workerDataConn = make(map[int]net.Conn)
childrenBolts = nil
// Check if I am backup
if shared.GetOwnServerNumber() == MASTER_ONE {
......@@ -388,14 +390,18 @@ func runSpout() {
// fmt.Println(childrenBolts)
// fmt.Println(child)
// Send data
conn, err := net.Dial("tcp", shared.GetServerAddressFromNumber(childServ)+":"+strconv.Itoa(shared.CraneWorkerPort))
if err != nil {
panic(err)
if _, ok := workerDataConn[childServ]; !ok {
conn, err := net.Dial("tcp", shared.GetServerAddressFromNumber(childServ)+":"+strconv.Itoa(shared.CraneWorkerPort))
if err != nil {
panic(err)
}
workerDataConn[childServ] = conn
}
buf := new(bytes.Buffer)
gobEncoder := gob.NewEncoder(buf)
gobEncoder.Encode(data)
conn.Write(buf.Bytes())
workerDataConn[childServ].Write(buf.Bytes())
// read the response
// readBuf := make([]byte, 1024)
// n, err := conn.Read(readBuf)
......@@ -403,7 +409,7 @@ func runSpout() {
// fmt.Println(err)
// }
// fmt.Println(string(readBuf[:n]))
conn.Close()
//conn.Close()
if needRestart {
needRestart = false
return
......
......@@ -111,7 +111,7 @@ func handleWorkerRequest(conn net.Conn) {
gobDecoder.Decode(data)
// Write ack response
//conn.Write([]byte("Ack"))
go processData(*data)
processData(*data)
}
}
conn.Close()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment