Skip to content
Snippets Groups Projects
Commit 630a982b authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Added a getId method to split to force classes to specify a unique ID

for each split. This replaces the previous method of calling
split.toString, which would produce different results for the same split
each time it is deserialized (because the default implementation returns
the Java object's address).
parent f9671b08
No related branches found
No related tags found
No related merge requests found
......@@ -14,7 +14,9 @@ import org.apache.hadoop.mapred.Reporter
@serializable class HdfsSplit(@transient s: InputSplit)
extends Split {
val inputSplit = new SerializableWritable[InputSplit](s)
override def toString = inputSplit.toString
override def getId() = inputSplit.toString // Hadoop makes this unique
// for each split of each file
}
class HdfsTextFile(sc: SparkContext, path: String)
......
......@@ -17,7 +17,7 @@ extends Split {
case _ => false
}
override def toString() =
override def getId() =
"ParallelArraySplit(arrayId %d, slice %d)".format(arrayId, slice)
}
......
......@@ -166,8 +166,8 @@ extends RDD[Array[T]](prev.sparkContext) {
@serializable class SeededSplit(val prev: Split, val seed: Int) extends Split {
override def toString() =
"SeededSplit(" + prev.toString + ", seed " + seed + ")"
override def getId() =
"SeededSplit(" + prev.getId() + ", seed " + seed + ")"
}
class SampledRDD[T: ClassManifest](
......@@ -216,7 +216,7 @@ extends RDD[T](prev.sparkContext) with Logging {
}
override def iterator(split: Split): Iterator[T] = {
val key = id + "::" + split.toString
val key = id + "::" + split.getId()
logInfo("CachedRDD split key is " + key)
val cache = CachedRDD.cache
val loading = CachedRDD.loading
......@@ -271,7 +271,7 @@ private object CachedRDD {
abstract class UnionSplit[T: ClassManifest] extends Split {
def iterator(): Iterator[T]
def preferredLocations(): Seq[String]
def toString(): String
def getId(): String
}
@serializable
......@@ -280,8 +280,8 @@ class UnionSplitImpl[T: ClassManifest](
extends UnionSplit[T] {
override def iterator() = rdd.iterator(split)
override def preferredLocations() = rdd.preferredLocations(split)
override def toString() =
"UnionSplitImpl(" + split.toString + ")"
override def getId() =
"UnionSplitImpl(" + split.getId() + ")"
}
@serializable
......@@ -304,8 +304,8 @@ extends RDD[T](sc) {
}
@serializable class CartesianSplit(val s1: Split, val s2: Split) extends Split {
override def toString() =
"CartesianSplit(" + s1.toString + ", " + s2.toString + ")"
override def getId() =
"CartesianSplit(" + s1.getId() + ", " + s2.getId() + ")"
}
@serializable
......
package spark
abstract class Split {}
/**
* A partition of an RDD.
*/
trait Split {
/**
* Get a unique ID for this split which can be used, for example, to
* set up caches based on it. The ID should stay the same if we serialize
* and then deserialize the split.
*/
def getId(): String
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment