Skip to content
Snippets Groups Projects
Commit 95ebd58d authored by Mosharaf Chowdhury's avatar Mosharaf Chowdhury
Browse files

- Made sure only one Leaving notification goes to the tracker per ShuffleClient.

 - Why ShuffleClient and ShuffleServerThread are crashing is still unknown.
parent 7ac3463a
No related branches found
No related tags found
No related merge requests found
......@@ -380,6 +380,9 @@ extends Shuffle[K, V, C] with Logging {
private var oisSource: ObjectInputStream = null
private var receptionSucceeded = false
// Make sure that multiple messages don't go to the tracker
private var alreadySentLeavingNotification = false
override def run: Unit = {
// Setup the timeout mechanism
......@@ -476,45 +479,50 @@ extends Shuffle[K, V, C] with Logging {
}
// Connect to the tracker and update its stats
private def sendLeavingNotification(): Unit = {
val clientSocketToTracker =
new Socket(TrackedCustomParallelLocalFileShuffle.MasterHostAddress,
TrackedCustomParallelLocalFileShuffle.MasterTrackerPort)
val oosTracker =
new ObjectOutputStream(clientSocketToTracker.getOutputStream)
oosTracker.flush()
val oisTracker =
new ObjectInputStream(clientSocketToTracker.getInputStream)
try {
// Send intention
oosTracker.writeObject(
TrackedCustomParallelLocalFileShuffle.ReducerLeaving)
oosTracker.flush()
// Send reducerSplitInfo
oosTracker.writeObject(getLocalSplitInfo)
private def sendLeavingNotification(): Unit = synchronized {
if (!alreadySentLeavingNotification) {
val clientSocketToTracker =
new Socket(TrackedCustomParallelLocalFileShuffle.MasterHostAddress,
TrackedCustomParallelLocalFileShuffle.MasterTrackerPort)
val oosTracker =
new ObjectOutputStream(clientSocketToTracker.getOutputStream)
oosTracker.flush()
// Send serverSplitInfo so that tracker can update its stats
oosTracker.writeObject(splitIndex)
oosTracker.flush()
// Receive ACK. No need to do anything with that
oisTracker.readObject.asInstanceOf[Int]
} catch {
case e: Exception => {
logInfo("sendLeavingNotification had a " + e)
val oisTracker =
new ObjectInputStream(clientSocketToTracker.getInputStream)
try {
// Send intention
oosTracker.writeObject(
TrackedCustomParallelLocalFileShuffle.ReducerLeaving)
oosTracker.flush()
// Send reducerSplitInfo
oosTracker.writeObject(getLocalSplitInfo)
oosTracker.flush()
// Send serverSplitInfo so that tracker can update its stats
oosTracker.writeObject(splitIndex)
oosTracker.flush()
// Receive ACK. No need to do anything with that
oisTracker.readObject.asInstanceOf[Int]
// Now update sentLeavingNotifacation
alreadySentLeavingNotification = true
} catch {
case e: Exception => {
logInfo("sendLeavingNotification had a " + e)
}
} finally {
oisTracker.close()
oosTracker.close()
clientSocketToTracker.close()
}
} finally {
oisTracker.close()
oosTracker.close()
clientSocketToTracker.close()
}
}
}
private def cleanUp(): Unit = {
// Update tracker stats first.
// Update tracker stats first
sendLeavingNotification()
// Clean up the connections to the mapper
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment