diff --git a/src/scala/spark/HdfsFile.scala b/src/scala/spark/HdfsFile.scala index 886272a8edd14131f096b3ef2a233a901876db1c..8637c6e30aa47bf08b441bc5fe6fad2214e1cfc1 100644 --- a/src/scala/spark/HdfsFile.scala +++ b/src/scala/spark/HdfsFile.scala @@ -14,7 +14,9 @@ import org.apache.hadoop.mapred.Reporter @serializable class HdfsSplit(@transient s: InputSplit) extends Split { val inputSplit = new SerializableWritable[InputSplit](s) - override def toString = inputSplit.toString + + override def getId() = inputSplit.toString // Hadoop makes this unique + // for each split of each file } class HdfsTextFile(sc: SparkContext, path: String) diff --git a/src/scala/spark/ParallelArray.scala b/src/scala/spark/ParallelArray.scala index 12fbfaf4c220091159be3f184fb51bfb5b1b8c93..a01904d61c5d6ebb927f4e530a7a958e348c0f7f 100644 --- a/src/scala/spark/ParallelArray.scala +++ b/src/scala/spark/ParallelArray.scala @@ -17,7 +17,7 @@ extends Split { case _ => false } - override def toString() = + override def getId() = "ParallelArraySplit(arrayId %d, slice %d)".format(arrayId, slice) } diff --git a/src/scala/spark/RDD.scala b/src/scala/spark/RDD.scala index 5236eb958ffb4ae9023deebef784a3ab7275b37c..803c0638653c0aba4677e2f68e28883a6f72b053 100644 --- a/src/scala/spark/RDD.scala +++ b/src/scala/spark/RDD.scala @@ -166,8 +166,8 @@ extends RDD[Array[T]](prev.sparkContext) { @serializable class SeededSplit(val prev: Split, val seed: Int) extends Split { - override def toString() = - "SeededSplit(" + prev.toString + ", seed " + seed + ")" + override def getId() = + "SeededSplit(" + prev.getId() + ", seed " + seed + ")" } class SampledRDD[T: ClassManifest]( @@ -216,7 +216,7 @@ extends RDD[T](prev.sparkContext) with Logging { } override def iterator(split: Split): Iterator[T] = { - val key = id + "::" + split.toString + val key = id + "::" + split.getId() logInfo("CachedRDD split key is " + key) val cache = CachedRDD.cache val loading = CachedRDD.loading @@ -271,7 +271,7 @@ private object CachedRDD { abstract class UnionSplit[T: ClassManifest] extends Split { def iterator(): Iterator[T] def preferredLocations(): Seq[String] - def toString(): String + def getId(): String } @serializable @@ -280,8 +280,8 @@ class UnionSplitImpl[T: ClassManifest]( extends UnionSplit[T] { override def iterator() = rdd.iterator(split) override def preferredLocations() = rdd.preferredLocations(split) - override def toString() = - "UnionSplitImpl(" + split.toString + ")" + override def getId() = + "UnionSplitImpl(" + split.getId() + ")" } @serializable @@ -304,8 +304,8 @@ extends RDD[T](sc) { } @serializable class CartesianSplit(val s1: Split, val s2: Split) extends Split { - override def toString() = - "CartesianSplit(" + s1.toString + ", " + s2.toString + ")" + override def getId() = + "CartesianSplit(" + s1.getId() + ", " + s2.getId() + ")" } @serializable diff --git a/src/scala/spark/Split.scala b/src/scala/spark/Split.scala index 4251191814cfa903e7d71a984b486cf46d547ae4..0f7a21354ddd152ed618269d77619ce5bcdf081a 100644 --- a/src/scala/spark/Split.scala +++ b/src/scala/spark/Split.scala @@ -1,3 +1,13 @@ package spark -abstract class Split {} +/** + * A partition of an RDD. + */ +trait Split { + /** + * Get a unique ID for this split which can be used, for example, to + * set up caches based on it. The ID should stay the same if we serialize + * and then deserialize the split. + */ + def getId(): String +}