diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index 0bbdb4e4327d6152fc3051fd61cef6ba9447b4a8..9c42e88b684bb2cecd7fffe85a3290330edd50dc 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -29,7 +29,7 @@ class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { val blocksByAddress: Seq[(BlockManagerId, Seq[String])] = splitsByAddress.toSeq.map { case (address, splits) => - (address, splits.map(i => "shuffleid_%d_%d_%d".format(shuffleId, i, reduceId))) + (address, splits.map(i => "shuffle_%d_%d_%d".format(shuffleId, i, reduceId))) } for ((blockId, blockOption) <- blockManager.getMultiple(blocksByAddress)) { @@ -42,7 +42,7 @@ class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { } } case None => { - val regex = "shuffleid_([0-9]*)_([0-9]*)_([0-9]*)".r + val regex = "shuffle_([0-9]*)_([0-9]*)_([0-9]*)".r blockId match { case regex(shufId, mapId, reduceId) => val addr = addresses(mapId.toInt) diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index 225a5ad403abb67ae5b7a105cc9df518537a7fa5..d9e0ef90b88c16481062a0bfbc47338f6ebd147e 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -167,7 +167,7 @@ class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: Bl // Gets or computes an RDD split def getOrCompute[T](rdd: RDD[T], split: Split, storageLevel: StorageLevel): Iterator[T] = { - val key = "rdd:%d:%d".format(rdd.id, split.index) + val key = "rdd_%d_%d".format(rdd.id, split.index) logInfo("Cache key is " + key) blockManager.get(key) match { case Some(cachedValues) => diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 784f25086e1cb085bc4db466c46bacc9583d0ea8..ab8014c056657588c4da05028e4cd46719319276 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -106,7 +106,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial // This is a hack. Ideally this should re-use the code used by the CacheTracker // to generate the key. - def getSplitKey(split: Split) = "rdd:%d:%d".format(this.id, split.index) + def getSplitKey(split: Split) = "rdd_%d_%d".format(this.id, split.index) persist(level) sc.runJob(this, (iter: Iterator[T]) => {} ) diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index d70a061366bfca700d30105266ea0cb7db5021df..27d97ffee543a79d707fd5d88cfbc24ff7d85425 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -144,7 +144,7 @@ class ShuffleMapTask( val ser = SparkEnv.get.serializer.newInstance() val blockManager = SparkEnv.get.blockManager for (i <- 0 until numOutputSplits) { - val blockId = "shuffleid_" + dep.shuffleId + "_" + partition + "_" + i + val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + i // Get a scala iterator from java map val iter: Iterator[(Any, Any)] = bucketIterators(i) // TODO: This should probably be DISK_ONLY diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index a66f812662e92d537ba948d00afbeb5fdc9ea1be..081981c838ef42307d46d1556b3e708a75e5dcf4 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -606,7 +606,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m // TODO: This code will be removed when CacheTracker is gone. private def notifyTheCacheTracker(key: String) { - val rddInfo = key.split(":") + val rddInfo = key.split("_") val rddId: Int = rddInfo(1).toInt val splitIndex: Int = rddInfo(2).toInt val host = System.getProperty("spark.hostname", Utils.localHostName())