From 2a18cd826c42d7c6b35eaedde1e4c423b6a1b1e5 Mon Sep 17 00:00:00 2001 From: Stephen Haberman <stephen@exigencecorp.com> Date: Sat, 9 Feb 2013 10:12:04 -0600 Subject: [PATCH] Add back return types. --- core/src/main/scala/spark/rdd/BlockRDD.scala | 4 ++-- core/src/main/scala/spark/rdd/CartesianRDD.scala | 4 ++-- core/src/main/scala/spark/rdd/CheckpointRDD.scala | 2 +- core/src/main/scala/spark/rdd/CoGroupedRDD.scala | 4 ++-- core/src/main/scala/spark/rdd/CoalescedRDD.scala | 12 +++++++----- core/src/main/scala/spark/rdd/FilteredRDD.scala | 2 +- core/src/main/scala/spark/rdd/FlatMappedRDD.scala | 2 +- core/src/main/scala/spark/rdd/GlommedRDD.scala | 2 +- core/src/main/scala/spark/rdd/HadoopRDD.scala | 4 ++-- core/src/main/scala/spark/rdd/MapPartitionsRDD.scala | 2 +- .../scala/spark/rdd/MapPartitionsWithSplitRDD.scala | 2 +- core/src/main/scala/spark/rdd/MappedRDD.scala | 2 +- core/src/main/scala/spark/rdd/NewHadoopRDD.scala | 4 ++-- .../main/scala/spark/rdd/PartitionPruningRDD.scala | 2 +- core/src/main/scala/spark/rdd/PipedRDD.scala | 2 +- core/src/main/scala/spark/rdd/SampledRDD.scala | 6 +++--- core/src/main/scala/spark/rdd/ShuffledRDD.scala | 4 +++- core/src/main/scala/spark/rdd/UnionRDD.scala | 4 ++-- core/src/main/scala/spark/rdd/ZippedRDD.scala | 2 +- 19 files changed, 35 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/spark/rdd/BlockRDD.scala b/core/src/main/scala/spark/rdd/BlockRDD.scala index 4214817c65..17989c5ce5 100644 --- a/core/src/main/scala/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/spark/rdd/BlockRDD.scala @@ -18,7 +18,7 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St HashMap(blockIds.zip(locations):_*) } - override def getSplits = (0 until blockIds.size).map(i => { + override def getSplits: Array[Split] = (0 until blockIds.size).map(i => { new BlockRDDSplit(blockIds(i), i).asInstanceOf[Split] }).toArray @@ -33,7 +33,7 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St } } - override def getPreferredLocations(split: Split) = + override def getPreferredLocations(split: Split): Seq[String] = locations_(split.asInstanceOf[BlockRDDSplit].blockId) } diff --git a/core/src/main/scala/spark/rdd/CartesianRDD.scala b/core/src/main/scala/spark/rdd/CartesianRDD.scala index 2f572a1941..41cbbd0093 100644 --- a/core/src/main/scala/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/spark/rdd/CartesianRDD.scala @@ -35,7 +35,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest]( val numSplitsInRdd2 = rdd2.splits.size - override def getSplits = { + override def getSplits: Array[Split] = { // create the cross product split val array = new Array[Split](rdd1.splits.size * rdd2.splits.size) for (s1 <- rdd1.splits; s2 <- rdd2.splits) { @@ -45,7 +45,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest]( array } - override def getPreferredLocations(split: Split) = { + override def getPreferredLocations(split: Split): Seq[String] = { val currSplit = split.asInstanceOf[CartesianSplit] rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2) } diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala index 7cde523f11..3558d4673f 100644 --- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala @@ -20,7 +20,7 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration) - override def getSplits = { + override def getSplits: Array[Split] = { val dirContents = fs.listStatus(new Path(checkpointPath)) val splitFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted val numSplits = splitFiles.size diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index d31ce13706..0a1e2cbee0 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -45,7 +45,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) private val aggr = new CoGroupAggregator - override def getDependencies = { + override def getDependencies: Seq[Dependency[_]] = { rdds.map { rdd => if (rdd.partitioner == Some(part)) { logInfo("Adding one-to-one dependency with " + rdd) @@ -58,7 +58,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) } } - override def getSplits = { + override def getSplits: Array[Split] = { val array = new Array[Split](part.numPartitions) for (i <- 0 until array.size) { // Each CoGroupSplit will have a dependency per contributing RDD diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala index a1aa7a30b0..fcd26da43a 100644 --- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala @@ -31,7 +31,7 @@ class CoalescedRDD[T: ClassManifest]( maxPartitions: Int) extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies - override def getSplits = { + override def getSplits: Array[Split] = { val prevSplits = prev.splits if (prevSplits.length < maxPartitions) { prevSplits.map(_.index).map{idx => new CoalescedRDDSplit(idx, prev, Array(idx)) } @@ -50,10 +50,12 @@ class CoalescedRDD[T: ClassManifest]( } } - override def getDependencies = Seq(new NarrowDependency(prev) { - def getParents(id: Int): Seq[Int] = - splits(id).asInstanceOf[CoalescedRDDSplit].parentsIndices - }) + override def getDependencies: Seq[Dependency[_]] = { + Seq(new NarrowDependency(prev) { + def getParents(id: Int): Seq[Int] = + splits(id).asInstanceOf[CoalescedRDDSplit].parentsIndices + }) + } override def clearDependencies() { super.clearDependencies() diff --git a/core/src/main/scala/spark/rdd/FilteredRDD.scala b/core/src/main/scala/spark/rdd/FilteredRDD.scala index 6dbe235bd9..93e398ea2b 100644 --- a/core/src/main/scala/spark/rdd/FilteredRDD.scala +++ b/core/src/main/scala/spark/rdd/FilteredRDD.scala @@ -7,7 +7,7 @@ private[spark] class FilteredRDD[T: ClassManifest]( f: T => Boolean) extends RDD[T](prev) { - override def getSplits = firstParent[T].splits + override def getSplits: Array[Split] = firstParent[T].splits override val partitioner = prev.partitioner // Since filter cannot change a partition's keys diff --git a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala index 1b604c66e2..8c2a610593 100644 --- a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala +++ b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala @@ -9,7 +9,7 @@ class FlatMappedRDD[U: ClassManifest, T: ClassManifest]( f: T => TraversableOnce[U]) extends RDD[U](prev) { - override def getSplits = firstParent[T].splits + override def getSplits: Array[Split] = firstParent[T].splits override def compute(split: Split, context: TaskContext) = firstParent[T].iterator(split, context).flatMap(f) diff --git a/core/src/main/scala/spark/rdd/GlommedRDD.scala b/core/src/main/scala/spark/rdd/GlommedRDD.scala index 051bffed19..70b9b4e34e 100644 --- a/core/src/main/scala/spark/rdd/GlommedRDD.scala +++ b/core/src/main/scala/spark/rdd/GlommedRDD.scala @@ -5,7 +5,7 @@ import spark.{RDD, Split, TaskContext} private[spark] class GlommedRDD[T: ClassManifest](prev: RDD[T]) extends RDD[Array[T]](prev) { - override def getSplits = firstParent[T].splits + override def getSplits: Array[Split] = firstParent[T].splits override def compute(split: Split, context: TaskContext) = Array(firstParent[T].iterator(split, context).toArray).iterator diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala index cd948de967..854993737b 100644 --- a/core/src/main/scala/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala @@ -47,7 +47,7 @@ class HadoopRDD[K, V]( // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) - override def getSplits = { + override def getSplits: Array[Split] = { val inputFormat = createInputFormat(conf) val inputSplits = inputFormat.getSplits(conf, minSplits) val array = new Array[Split](inputSplits.size) @@ -106,7 +106,7 @@ class HadoopRDD[K, V]( } } - override def getPreferredLocations(split: Split) = { + override def getPreferredLocations(split: Split): Seq[String] = { // TODO: Filtering out "localhost" in case of file:// URLs val hadoopSplit = split.asInstanceOf[HadoopSplit] hadoopSplit.inputSplit.value.getLocations.filter(_ != "localhost") diff --git a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala index 073f7d7d2a..7b0b4525c7 100644 --- a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala +++ b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala @@ -13,7 +13,7 @@ class MapPartitionsRDD[U: ClassManifest, T: ClassManifest]( override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None - override def getSplits = firstParent[T].splits + override def getSplits: Array[Split] = firstParent[T].splits override def compute(split: Split, context: TaskContext) = f(firstParent[T].iterator(split, context)) diff --git a/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala index 2ddc3d01b6..c6dc1080a9 100644 --- a/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala +++ b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala @@ -15,7 +15,7 @@ class MapPartitionsWithSplitRDD[U: ClassManifest, T: ClassManifest]( preservesPartitioning: Boolean ) extends RDD[U](prev) { - override def getSplits = firstParent[T].splits + override def getSplits: Array[Split] = firstParent[T].splits override val partitioner = if (preservesPartitioning) prev.partitioner else None diff --git a/core/src/main/scala/spark/rdd/MappedRDD.scala b/core/src/main/scala/spark/rdd/MappedRDD.scala index 5466c9c657..6074f411e3 100644 --- a/core/src/main/scala/spark/rdd/MappedRDD.scala +++ b/core/src/main/scala/spark/rdd/MappedRDD.scala @@ -6,7 +6,7 @@ private[spark] class MappedRDD[U: ClassManifest, T: ClassManifest](prev: RDD[T], f: T => U) extends RDD[U](prev) { - override def getSplits = firstParent[T].splits + override def getSplits: Array[Split] = firstParent[T].splits override def compute(split: Split, context: TaskContext) = firstParent[T].iterator(split, context).map(f) diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala index 2d000f5c68..345ae79d74 100644 --- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala @@ -39,7 +39,7 @@ class NewHadoopRDD[K, V]( @transient private val jobId = new JobID(jobtrackerId, id) - override def getSplits = { + override def getSplits: Array[Split] = { val inputFormat = inputFormatClass.newInstance val jobContext = newJobContext(conf, jobId) val rawSplits = inputFormat.getSplits(jobContext).toArray @@ -83,7 +83,7 @@ class NewHadoopRDD[K, V]( } } - override def getPreferredLocations(split: Split) = { + override def getPreferredLocations(split: Split): Seq[String] = { val theSplit = split.asInstanceOf[NewHadoopSplit] theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost") } diff --git a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala index a50ce75171..d1553181c1 100644 --- a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala +++ b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala @@ -37,6 +37,6 @@ class PartitionPruningRDD[T: ClassManifest]( override def compute(split: Split, context: TaskContext) = firstParent[T].iterator( split.asInstanceOf[PartitionPruningRDDSplit].parentSplit, context) - override protected def getSplits = + override protected def getSplits: Array[Split] = getDependencies.head.asInstanceOf[PruneDependency[T]].partitions } diff --git a/core/src/main/scala/spark/rdd/PipedRDD.scala b/core/src/main/scala/spark/rdd/PipedRDD.scala index 6631f83510..56032a8659 100644 --- a/core/src/main/scala/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/spark/rdd/PipedRDD.scala @@ -27,7 +27,7 @@ class PipedRDD[T: ClassManifest]( // using a standard StringTokenizer (i.e. by spaces) def this(prev: RDD[T], command: String) = this(prev, PipedRDD.tokenize(command)) - override def getSplits = firstParent[T].splits + override def getSplits: Array[Split] = firstParent[T].splits override def compute(split: Split, context: TaskContext): Iterator[String] = { val pb = new ProcessBuilder(command) diff --git a/core/src/main/scala/spark/rdd/SampledRDD.scala b/core/src/main/scala/spark/rdd/SampledRDD.scala index 81626d5009..f2a144e2e0 100644 --- a/core/src/main/scala/spark/rdd/SampledRDD.scala +++ b/core/src/main/scala/spark/rdd/SampledRDD.scala @@ -19,15 +19,15 @@ class SampledRDD[T: ClassManifest]( seed: Int) extends RDD[T](prev) { - override def getSplits = { + override def getSplits: Array[Split] = { val rg = new Random(seed) firstParent[T].splits.map(x => new SampledRDDSplit(x, rg.nextInt)) } - override def getPreferredLocations(split: Split) = + override def getPreferredLocations(split: Split): Seq[String] = firstParent[T].preferredLocations(split.asInstanceOf[SampledRDDSplit].prev) - override def compute(splitIn: Split, context: TaskContext) = { + override def compute(splitIn: Split, context: TaskContext): Iterator[T] = { val split = splitIn.asInstanceOf[SampledRDDSplit] if (withReplacement) { // For large datasets, the expected number of occurrences of each element in a sample with diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala index d396478673..bf69b5150b 100644 --- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala @@ -22,7 +22,9 @@ class ShuffledRDD[K, V]( override val partitioner = Some(part) - override def getSplits = Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i)) + override def getSplits: Array[Split] = { + Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i)) + } override def compute(split: Split, context: TaskContext): Iterator[(K, V)] = { val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId diff --git a/core/src/main/scala/spark/rdd/UnionRDD.scala b/core/src/main/scala/spark/rdd/UnionRDD.scala index 5ac24d2ffc..ebc0068228 100644 --- a/core/src/main/scala/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/spark/rdd/UnionRDD.scala @@ -28,7 +28,7 @@ class UnionRDD[T: ClassManifest]( @transient var rdds: Seq[RDD[T]]) extends RDD[T](sc, Nil) { // Nil since we implement getDependencies - override def getSplits = { + override def getSplits: Array[Split] = { val array = new Array[Split](rdds.map(_.splits.size).sum) var pos = 0 for (rdd <- rdds; split <- rdd.splits) { @@ -38,7 +38,7 @@ class UnionRDD[T: ClassManifest]( array } - override def getDependencies = { + override def getDependencies: Seq[Dependency[_]] = { val deps = new ArrayBuffer[Dependency[_]] var pos = 0 for (rdd <- rdds) { diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala index a079720a93..1ce70268bb 100644 --- a/core/src/main/scala/spark/rdd/ZippedRDD.scala +++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala @@ -31,7 +31,7 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest]( var rdd2: RDD[U]) extends RDD[(T, U)](sc, List(new OneToOneDependency(rdd1), new OneToOneDependency(rdd2))) { - override def getSplits = { + override def getSplits: Array[Split] = { if (rdd1.splits.size != rdd2.splits.size) { throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions") } -- GitLab