Skip to content
Snippets Groups Projects
Commit 98f008b7 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Formatting fixes

parent 7660a8b1
No related branches found
No related tags found
No related merge requests found
......@@ -3,5 +3,5 @@ package spark
class Aggregator[K, V, C] (
val createCombiner: V => C,
val mergeValue: (C, V) => C,
val mergeCombiners: (C, C) => C
) extends Serializable
\ No newline at end of file
val mergeCombiners: (C, C) => C)
extends Serializable
......@@ -15,11 +15,12 @@ class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep]) extends Split with
override def hashCode(): Int = idx
}
class CoGroupAggregator extends Aggregator[Any, Any, ArrayBuffer[Any]] (
class CoGroupAggregator
extends Aggregator[Any, Any, ArrayBuffer[Any]](
{ x => ArrayBuffer(x) },
{ (b, x) => b += x },
{ (b1, b2) => b1 ++ b2 }
) with Serializable
{ (b1, b2) => b1 ++ b2 })
with Serializable
class CoGroupedRDD[K](rdds: Seq[RDD[(_, _)]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context) with Logging {
......
......@@ -119,8 +119,10 @@ private trait DAGScheduler extends Scheduler with Logging {
cacheTracker.registerRDD(r.id, r.splits.size)
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_,_] => parents += getShuffleMapStage(shufDep)
case _ => visit(dep.rdd)
case shufDep: ShuffleDependency[_,_,_] =>
parents += getShuffleMapStage(shufDep)
case _ =>
visit(dep.rdd)
}
}
}
......
......@@ -13,5 +13,6 @@ class FetchFailedException(
override def getCause(): Throwable = cause
def toTaskEndReason: TaskEndReason = FetchFailed(serverUri, shuffleId, mapId, reduceId)
def toTaskEndReason: TaskEndReason =
FetchFailed(serverUri, shuffleId, mapId, reduceId)
}
package spark
import java.io.EOFException
import java.util.NoSuchElementException
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.Text
......@@ -15,11 +18,9 @@ import org.apache.hadoop.util.ReflectionUtils
/**
* A Spark split class that wraps around a Hadoop InputSplit.
*/
class HadoopSplit(
rddId: Int,
idx: Int,
@transient s: InputSplit)
extends Split with Serializable {
class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit)
extends Split
with Serializable {
val inputSplit = new SerializableWritable[InputSplit](s)
......@@ -91,7 +92,8 @@ class HadoopRDD[K, V](
try {
finished = !reader.next(key, value)
} catch {
case eofe: java.io.EOFException => finished = true
case eof: EOFException =>
finished = true
}
gotNext = true
}
......@@ -106,7 +108,7 @@ class HadoopRDD[K, V](
finished = !reader.next(key, value)
}
if (finished) {
throw new java.util.NoSuchElementException("End of stream")
throw new NoSuchElementException("End of stream")
}
gotNext = false
(key, value)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment