From 53f90d0f0ea8ae605e95ece9850df6004123c787 Mon Sep 17 00:00:00 2001 From: Matei Zaharia <matei@eecs.berkeley.edu> Date: Mon, 1 Oct 2012 10:48:53 -0700 Subject: [PATCH] Use underscores instead of colons in RDD IDs --- core/src/main/scala/spark/BlockStoreShuffleFetcher.scala | 4 ++-- core/src/main/scala/spark/CacheTracker.scala | 2 +- core/src/main/scala/spark/RDD.scala | 2 +- core/src/main/scala/spark/scheduler/ShuffleMapTask.scala | 2 +- core/src/main/scala/spark/storage/BlockManager.scala | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index 0bbdb4e432..9c42e88b68 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -29,7 +29,7 @@ class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { val blocksByAddress: Seq[(BlockManagerId, Seq[String])] = splitsByAddress.toSeq.map { case (address, splits) => - (address, splits.map(i => "shuffleid_%d_%d_%d".format(shuffleId, i, reduceId))) + (address, splits.map(i => "shuffle_%d_%d_%d".format(shuffleId, i, reduceId))) } for ((blockId, blockOption) <- blockManager.getMultiple(blocksByAddress)) { @@ -42,7 +42,7 @@ class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { } } case None => { - val regex = "shuffleid_([0-9]*)_([0-9]*)_([0-9]*)".r + val regex = "shuffle_([0-9]*)_([0-9]*)_([0-9]*)".r blockId match { case regex(shufId, mapId, reduceId) => val addr = addresses(mapId.toInt) diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index 225a5ad403..d9e0ef90b8 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -167,7 +167,7 @@ class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: Bl // Gets or computes an RDD split def getOrCompute[T](rdd: RDD[T], split: Split, storageLevel: StorageLevel): Iterator[T] = { - val key = "rdd:%d:%d".format(rdd.id, split.index) + val key = "rdd_%d_%d".format(rdd.id, split.index) logInfo("Cache key is " + key) blockManager.get(key) match { case Some(cachedValues) => diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 784f25086e..ab8014c056 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -106,7 +106,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial // This is a hack. Ideally this should re-use the code used by the CacheTracker // to generate the key. - def getSplitKey(split: Split) = "rdd:%d:%d".format(this.id, split.index) + def getSplitKey(split: Split) = "rdd_%d_%d".format(this.id, split.index) persist(level) sc.runJob(this, (iter: Iterator[T]) => {} ) diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index d70a061366..27d97ffee5 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -144,7 +144,7 @@ class ShuffleMapTask( val ser = SparkEnv.get.serializer.newInstance() val blockManager = SparkEnv.get.blockManager for (i <- 0 until numOutputSplits) { - val blockId = "shuffleid_" + dep.shuffleId + "_" + partition + "_" + i + val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + i // Get a scala iterator from java map val iter: Iterator[(Any, Any)] = bucketIterators(i) // TODO: This should probably be DISK_ONLY diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index a66f812662..081981c838 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -606,7 +606,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m // TODO: This code will be removed when CacheTracker is gone. private def notifyTheCacheTracker(key: String) { - val rddInfo = key.split(":") + val rddInfo = key.split("_") val rddId: Int = rddInfo(1).toInt val splitIndex: Int = rddInfo(2).toInt val host = System.getProperty("spark.hostname", Utils.localHostName()) -- GitLab