Skip to content
Snippets Groups Projects
Commit 023ed194 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Fixed some whitespace

parent 74bbfa91
No related branches found
No related tags found
No related merge requests found
...@@ -15,7 +15,7 @@ import org.apache.hadoop.util.ReflectionUtils ...@@ -15,7 +15,7 @@ import org.apache.hadoop.util.ReflectionUtils
/** A Spark split class that wraps around a Hadoop InputSplit */ /** A Spark split class that wraps around a Hadoop InputSplit */
@serializable class HadoopSplit(@transient s: InputSplit) @serializable class HadoopSplit(@transient s: InputSplit)
extends Split { extends Split {
val inputSplit = new SerializableWritable[InputSplit](s) val inputSplit = new SerializableWritable[InputSplit](s)
// Hadoop gives each split a unique toString value, so use this as our ID // Hadoop gives each split a unique toString value, so use this as our ID
...@@ -48,7 +48,7 @@ extends RDD[(K, V)](sc) { ...@@ -48,7 +48,7 @@ extends RDD[(K, V)](sc) {
} }
override def splits = splits_ override def splits = splits_
override def iterator(theSplit: Split) = new Iterator[(K, V)] { override def iterator(theSplit: Split) = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopSplit] val split = theSplit.asInstanceOf[HadoopSplit]
var reader: RecordReader[K, V] = null var reader: RecordReader[K, V] = null
......
...@@ -11,7 +11,7 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool ...@@ -11,7 +11,7 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool
/** /**
* Exception type thrown by HttpServer when it is in the wrong state * Exception type thrown by HttpServer when it is in the wrong state
* for an operation. * for an operation.
*/ */
class ServerStateException(message: String) extends Exception(message) class ServerStateException(message: String) extends Exception(message)
......
...@@ -78,7 +78,7 @@ abstract class RDD[T: ClassManifest]( ...@@ -78,7 +78,7 @@ abstract class RDD[T: ClassManifest](
case _ => throw new UnsupportedOperationException("empty collection") case _ => throw new UnsupportedOperationException("empty collection")
} }
def count(): Long = def count(): Long =
try { map(x => 1L).reduce(_+_) } try { map(x => 1L).reduce(_+_) }
catch { case e: UnsupportedOperationException => 0L } catch { case e: UnsupportedOperationException => 0L }
...@@ -128,7 +128,7 @@ extends RDDTask[Option[T], T](rdd, split) with Logging { ...@@ -128,7 +128,7 @@ extends RDDTask[Option[T], T](rdd, split) with Logging {
} }
class MappedRDD[U: ClassManifest, T: ClassManifest]( class MappedRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T], f: T => U) prev: RDD[T], f: T => U)
extends RDD[U](prev.sparkContext) { extends RDD[U](prev.sparkContext) {
override def splits = prev.splits override def splits = prev.splits
override def preferredLocations(split: Split) = prev.preferredLocations(split) override def preferredLocations(split: Split) = prev.preferredLocations(split)
...@@ -137,7 +137,7 @@ extends RDD[U](prev.sparkContext) { ...@@ -137,7 +137,7 @@ extends RDD[U](prev.sparkContext) {
} }
class FilteredRDD[T: ClassManifest]( class FilteredRDD[T: ClassManifest](
prev: RDD[T], f: T => Boolean) prev: RDD[T], f: T => Boolean)
extends RDD[T](prev.sparkContext) { extends RDD[T](prev.sparkContext) {
override def splits = prev.splits override def splits = prev.splits
override def preferredLocations(split: Split) = prev.preferredLocations(split) override def preferredLocations(split: Split) = prev.preferredLocations(split)
...@@ -146,7 +146,7 @@ extends RDD[T](prev.sparkContext) { ...@@ -146,7 +146,7 @@ extends RDD[T](prev.sparkContext) {
} }
class FlatMappedRDD[U: ClassManifest, T: ClassManifest]( class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T], f: T => Traversable[U]) prev: RDD[T], f: T => Traversable[U])
extends RDD[U](prev.sparkContext) { extends RDD[U](prev.sparkContext) {
override def splits = prev.splits override def splits = prev.splits
override def preferredLocations(split: Split) = prev.preferredLocations(split) override def preferredLocations(split: Split) = prev.preferredLocations(split)
...@@ -155,7 +155,7 @@ extends RDD[U](prev.sparkContext) { ...@@ -155,7 +155,7 @@ extends RDD[U](prev.sparkContext) {
override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot) override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot)
} }
class SplitRDD[T: ClassManifest](prev: RDD[T]) class SplitRDD[T: ClassManifest](prev: RDD[T])
extends RDD[Array[T]](prev.sparkContext) { extends RDD[Array[T]](prev.sparkContext) {
override def splits = prev.splits override def splits = prev.splits
override def preferredLocations(split: Split) = prev.preferredLocations(split) override def preferredLocations(split: Split) = prev.preferredLocations(split)
...@@ -170,16 +170,16 @@ extends RDD[Array[T]](prev.sparkContext) { ...@@ -170,16 +170,16 @@ extends RDD[Array[T]](prev.sparkContext) {
} }
class SampledRDD[T: ClassManifest]( class SampledRDD[T: ClassManifest](
prev: RDD[T], withReplacement: Boolean, frac: Double, seed: Int) prev: RDD[T], withReplacement: Boolean, frac: Double, seed: Int)
extends RDD[T](prev.sparkContext) { extends RDD[T](prev.sparkContext) {
@transient val splits_ = { val rg = new Random(seed); prev.splits.map(x => new SeededSplit(x, rg.nextInt)) } @transient val splits_ = { val rg = new Random(seed); prev.splits.map(x => new SeededSplit(x, rg.nextInt)) }
override def splits = splits_.asInstanceOf[Array[Split]] override def splits = splits_.asInstanceOf[Array[Split]]
override def preferredLocations(split: Split) = prev.preferredLocations(split.asInstanceOf[SeededSplit].prev) override def preferredLocations(split: Split) = prev.preferredLocations(split.asInstanceOf[SeededSplit].prev)
override def iterator(splitIn: Split) = { override def iterator(splitIn: Split) = {
val split = splitIn.asInstanceOf[SeededSplit] val split = splitIn.asInstanceOf[SeededSplit]
val rg = new Random(split.seed); val rg = new Random(split.seed);
// Sampling with replacement (TODO: use reservoir sampling to make this more efficient?) // Sampling with replacement (TODO: use reservoir sampling to make this more efficient?)
...@@ -213,7 +213,7 @@ extends RDD[T](prev.sparkContext) with Logging { ...@@ -213,7 +213,7 @@ extends RDD[T](prev.sparkContext) with Logging {
else else
prev.preferredLocations(split) prev.preferredLocations(split)
} }
override def iterator(split: Split): Iterator[T] = { override def iterator(split: Split): Iterator[T] = {
val key = id + "::" + split.getId() val key = id + "::" + split.getId()
logInfo("CachedRDD split key is " + key) logInfo("CachedRDD split key is " + key)
...@@ -278,7 +278,7 @@ extends Split { ...@@ -278,7 +278,7 @@ extends Split {
class UnionRDD[T: ClassManifest](sc: SparkContext, rdds: Seq[RDD[T]]) class UnionRDD[T: ClassManifest](sc: SparkContext, rdds: Seq[RDD[T]])
extends RDD[T](sc) { extends RDD[T](sc) {
@transient val splits_ : Array[Split] = { @transient val splits_ : Array[Split] = {
val splits: Seq[Split] = val splits: Seq[Split] =
for (rdd <- rdds; split <- rdd.splits) for (rdd <- rdds; split <- rdd.splits)
yield new UnionSplit(rdd, split) yield new UnionSplit(rdd, split)
splits.toArray splits.toArray
...@@ -289,7 +289,7 @@ extends RDD[T](sc) { ...@@ -289,7 +289,7 @@ extends RDD[T](sc) {
override def iterator(s: Split): Iterator[T] = override def iterator(s: Split): Iterator[T] =
s.asInstanceOf[UnionSplit[T]].iterator() s.asInstanceOf[UnionSplit[T]].iterator()
override def preferredLocations(s: Split): Seq[String] = override def preferredLocations(s: Split): Seq[String] =
s.asInstanceOf[UnionSplit[T]].preferredLocations() s.asInstanceOf[UnionSplit[T]].preferredLocations()
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment