diff --git a/core/src/main/scala/spark/CoGroupedRDD.scala b/core/src/main/scala/spark/CoGroupedRDD.scala index dbd5d451577786d127658656e5b1923cf6b2af3b..ea9e2d38a9a6b083f18c8084be52217842870d0a 100644 --- a/core/src/main/scala/spark/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/CoGroupedRDD.scala @@ -26,7 +26,7 @@ class CoGroupAggregator extends Aggregator[Any, Any, ArrayBuffer[Any]] ( ) class CoGroupedRDD[K](rdds: Seq[RDD[(_, _)]], part: Partitioner) -extends RDD[(K, Seq[Seq[_]])](rdds.first.context) with Logging { +extends RDD[(K, Seq[Seq[_]])](rdds.head.context) with Logging { val aggr = new CoGroupAggregator override val dependencies = { @@ -45,7 +45,7 @@ extends RDD[(K, Seq[Seq[_]])](rdds.first.context) with Logging { } @transient val splits_ : Array[Split] = { - val firstRdd = rdds.first + val firstRdd = rdds.head val array = new Array[Split](part.numPartitions) for (i <- 0 until array.size) { array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (r, j) => diff --git a/core/src/main/scala/spark/DAGScheduler.scala b/core/src/main/scala/spark/DAGScheduler.scala index d49047d74a19005bcd500be90878a91e253f31d0..a970fb65262a2a0b4461ab9f000df0fead651690 100644 --- a/core/src/main/scala/spark/DAGScheduler.scala +++ b/core/src/main/scala/spark/DAGScheduler.scala @@ -237,7 +237,7 @@ private trait DAGScheduler extends Scheduler with Logging { if (stage.shuffleDep != None) { mapOutputTracker.registerMapOutputs( stage.shuffleDep.get.shuffleId, - stage.outputLocs.map(_.first).toArray) + stage.outputLocs.map(_.head).toArray) } updateCacheLocs() val newlyRunnable = new ArrayBuffer[Stage]