From 630a982b88a3bac3bd362d52d64331b1495f31ff Mon Sep 17 00:00:00 2001 From: Matei Zaharia <matei@eecs.berkeley.edu> Date: Thu, 7 Oct 2010 17:17:07 -0700 Subject: [PATCH] Added a getId method to split to force classes to specify a unique ID for each split. This replaces the previous method of calling split.toString, which would produce different results for the same split each time it is deserialized (because the default implementation returns the Java object's address). --- src/scala/spark/HdfsFile.scala | 4 +++- src/scala/spark/ParallelArray.scala | 2 +- src/scala/spark/RDD.scala | 16 ++++++++-------- src/scala/spark/Split.scala | 12 +++++++++++- 4 files changed, 23 insertions(+), 11 deletions(-) diff --git a/src/scala/spark/HdfsFile.scala b/src/scala/spark/HdfsFile.scala index 886272a8ed..8637c6e30a 100644 --- a/src/scala/spark/HdfsFile.scala +++ b/src/scala/spark/HdfsFile.scala @@ -14,7 +14,9 @@ import org.apache.hadoop.mapred.Reporter @serializable class HdfsSplit(@transient s: InputSplit) extends Split { val inputSplit = new SerializableWritable[InputSplit](s) - override def toString = inputSplit.toString + + override def getId() = inputSplit.toString // Hadoop makes this unique + // for each split of each file } class HdfsTextFile(sc: SparkContext, path: String) diff --git a/src/scala/spark/ParallelArray.scala b/src/scala/spark/ParallelArray.scala index 12fbfaf4c2..a01904d61c 100644 --- a/src/scala/spark/ParallelArray.scala +++ b/src/scala/spark/ParallelArray.scala @@ -17,7 +17,7 @@ extends Split { case _ => false } - override def toString() = + override def getId() = "ParallelArraySplit(arrayId %d, slice %d)".format(arrayId, slice) } diff --git a/src/scala/spark/RDD.scala b/src/scala/spark/RDD.scala index 5236eb958f..803c063865 100644 --- a/src/scala/spark/RDD.scala +++ b/src/scala/spark/RDD.scala @@ -166,8 +166,8 @@ extends RDD[Array[T]](prev.sparkContext) { @serializable class SeededSplit(val prev: Split, val seed: Int) extends Split { - override def toString() = - "SeededSplit(" + prev.toString + ", seed " + seed + ")" + override def getId() = + "SeededSplit(" + prev.getId() + ", seed " + seed + ")" } class SampledRDD[T: ClassManifest]( @@ -216,7 +216,7 @@ extends RDD[T](prev.sparkContext) with Logging { } override def iterator(split: Split): Iterator[T] = { - val key = id + "::" + split.toString + val key = id + "::" + split.getId() logInfo("CachedRDD split key is " + key) val cache = CachedRDD.cache val loading = CachedRDD.loading @@ -271,7 +271,7 @@ private object CachedRDD { abstract class UnionSplit[T: ClassManifest] extends Split { def iterator(): Iterator[T] def preferredLocations(): Seq[String] - def toString(): String + def getId(): String } @serializable @@ -280,8 +280,8 @@ class UnionSplitImpl[T: ClassManifest]( extends UnionSplit[T] { override def iterator() = rdd.iterator(split) override def preferredLocations() = rdd.preferredLocations(split) - override def toString() = - "UnionSplitImpl(" + split.toString + ")" + override def getId() = + "UnionSplitImpl(" + split.getId() + ")" } @serializable @@ -304,8 +304,8 @@ extends RDD[T](sc) { } @serializable class CartesianSplit(val s1: Split, val s2: Split) extends Split { - override def toString() = - "CartesianSplit(" + s1.toString + ", " + s2.toString + ")" + override def getId() = + "CartesianSplit(" + s1.getId() + ", " + s2.getId() + ")" } @serializable diff --git a/src/scala/spark/Split.scala b/src/scala/spark/Split.scala index 4251191814..0f7a21354d 100644 --- a/src/scala/spark/Split.scala +++ b/src/scala/spark/Split.scala @@ -1,3 +1,13 @@ package spark -abstract class Split {} +/** + * A partition of an RDD. + */ +trait Split { + /** + * Get a unique ID for this split which can be used, for example, to + * set up caches based on it. The ID should stay the same if we serialize + * and then deserialize the split. + */ + def getId(): String +} -- GitLab