Skip to content
Snippets Groups Projects
Commit aca8150c authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Ensure that AddedToCache messages make it home before tasks finish

parent 16c886a5
No related branches found
No related tags found
No related merge requests found
......@@ -28,11 +28,12 @@ class RDDCacheTracker extends DaemonActor with Logging {
case RegisterRDD(rddId: Int, numPartitions: Int) =>
logInfo("Registering RDD " + rddId + " with " + numPartitions + " partitions")
locs(rddId) = Array.fill[List[String]](numPartitions)(Nil)
reply("")
reply('OK)
case AddedToCache(rddId, partition, host) =>
logInfo("Cache entry added: (%s, %s) on %s".format(rddId, partition, host))
locs(rddId)(partition) = host :: locs(rddId)(partition)
reply('OK)
case DroppedFromCache(rddId, partition, host) =>
logInfo("Cache entry removed: (%s, %s) on %s".format(rddId, partition, host))
......@@ -123,7 +124,7 @@ private object RDDCache extends Logging {
// If we got here, we have to load the split
// Tell the master that we're doing so
val host = System.getProperty("spark.hostname", Utils.localHostName)
trackerActor ! AddedToCache(rdd.id, split.index, host)
val future = trackerActor !! AddedToCache(rdd.id, split.index, host)
// TODO: fetch any remote copy of the split that may be available
// TODO: also register a listener for when it unloads
logInfo("Computing partition " + split)
......@@ -133,7 +134,8 @@ private object RDDCache extends Logging {
loading.remove(key)
loading.notifyAll()
}
future.apply() // Wait for the reply from the cache tracker
return Iterator.fromArray(array)
}
}
}
\ No newline at end of file
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment