Skip to content
Snippets Groups Projects
Commit 4d9c2aee authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge branch 'master' into matei-scheduling

parents 0195ee5e f9671b08
No related branches found
No related tags found
No related merge requests found
...@@ -57,7 +57,7 @@ object Vector { ...@@ -57,7 +57,7 @@ object Vector {
implicit def doubleToMultiplier(num: Double) = new Multiplier(num) implicit def doubleToMultiplier(num: Double) = new Multiplier(num)
implicit object VectorAccumParam extends spark.AccumulatorParam[Vector] { implicit object VectorAccumParam extends spark.AccumulatorParam[Vector] {
def add(t1: Vector, t2: Vector) = t1 + t2 def addInPlace(t1: Vector, t2: Vector) = t1 + t2
def zero(initialValue: Vector) = Vector.zeros(initialValue.length) def zero(initialValue: Vector) = Vector.zeros(initialValue.length)
} }
} }
...@@ -4,15 +4,17 @@ import java.io._ ...@@ -4,15 +4,17 @@ import java.io._
import scala.collection.mutable.Map import scala.collection.mutable.Map
@serializable class Accumulator[T](initialValue: T, param: AccumulatorParam[T]) @serializable class Accumulator[T](
@transient initialValue: T, param: AccumulatorParam[T])
{ {
val id = Accumulators.newId val id = Accumulators.newId
@transient var value_ = initialValue @transient var value_ = initialValue // Current value on master
val zero = param.zero(initialValue) // Zero value to be passed to workers
var deserialized = false var deserialized = false
Accumulators.register(this) Accumulators.register(this)
def += (term: T) { value_ = param.add(value_, term) } def += (term: T) { value_ = param.addInPlace(value_, term) }
def value = this.value_ def value = this.value_
def value_= (t: T) { def value_= (t: T) {
if (!deserialized) value_ = t if (!deserialized) value_ = t
...@@ -22,7 +24,7 @@ import scala.collection.mutable.Map ...@@ -22,7 +24,7 @@ import scala.collection.mutable.Map
// Called by Java when deserializing an object // Called by Java when deserializing an object
private def readObject(in: ObjectInputStream) { private def readObject(in: ObjectInputStream) {
in.defaultReadObject in.defaultReadObject
value_ = param.zero(initialValue) value_ = zero
deserialized = true deserialized = true
Accumulators.register(this) Accumulators.register(this)
} }
...@@ -31,7 +33,7 @@ import scala.collection.mutable.Map ...@@ -31,7 +33,7 @@ import scala.collection.mutable.Map
} }
@serializable trait AccumulatorParam[T] { @serializable trait AccumulatorParam[T] {
def add(t1: T, t2: T): T def addInPlace(t1: T, t2: T): T
def zero(initialValue: T): T def zero(initialValue: T): T
} }
......
...@@ -165,7 +165,10 @@ extends RDD[Array[T]](prev.sparkContext) { ...@@ -165,7 +165,10 @@ extends RDD[Array[T]](prev.sparkContext) {
} }
@serializable class SeededSplit(val prev: Split, val seed: Int) extends Split {} @serializable class SeededSplit(val prev: Split, val seed: Int) extends Split {
override def toString() =
"SeededSplit(" + prev.toString + ", seed " + seed + ")"
}
class SampledRDD[T: ClassManifest]( class SampledRDD[T: ClassManifest](
prev: RDD[T], withReplacement: Boolean, frac: Double, seed: Int) prev: RDD[T], withReplacement: Boolean, frac: Double, seed: Int)
...@@ -268,6 +271,7 @@ private object CachedRDD { ...@@ -268,6 +271,7 @@ private object CachedRDD {
abstract class UnionSplit[T: ClassManifest] extends Split { abstract class UnionSplit[T: ClassManifest] extends Split {
def iterator(): Iterator[T] def iterator(): Iterator[T]
def preferredLocations(): Seq[String] def preferredLocations(): Seq[String]
def toString(): String
} }
@serializable @serializable
...@@ -276,6 +280,8 @@ class UnionSplitImpl[T: ClassManifest]( ...@@ -276,6 +280,8 @@ class UnionSplitImpl[T: ClassManifest](
extends UnionSplit[T] { extends UnionSplit[T] {
override def iterator() = rdd.iterator(split) override def iterator() = rdd.iterator(split)
override def preferredLocations() = rdd.preferredLocations(split) override def preferredLocations() = rdd.preferredLocations(split)
override def toString() =
"UnionSplitImpl(" + split.toString + ")"
} }
@serializable @serializable
...@@ -297,7 +303,10 @@ extends RDD[T](sc) { ...@@ -297,7 +303,10 @@ extends RDD[T](sc) {
s.asInstanceOf[UnionSplit[T]].preferredLocations() s.asInstanceOf[UnionSplit[T]].preferredLocations()
} }
@serializable class CartesianSplit(val s1: Split, val s2: Split) extends Split @serializable class CartesianSplit(val s1: Split, val s2: Split) extends Split {
override def toString() =
"CartesianSplit(" + s1.toString + ", " + s2.toString + ")"
}
@serializable @serializable
class CartesianRDD[T: ClassManifest, U:ClassManifest]( class CartesianRDD[T: ClassManifest, U:ClassManifest](
......
...@@ -82,12 +82,12 @@ class SparkContext(master: String, frameworkName: String) extends Logging { ...@@ -82,12 +82,12 @@ class SparkContext(master: String, frameworkName: String) extends Logging {
object SparkContext { object SparkContext {
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def add(t1: Double, t2: Double): Double = t1 + t2 def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double) = 0.0 def zero(initialValue: Double) = 0.0
} }
implicit object IntAccumulatorParam extends AccumulatorParam[Int] { implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
def add(t1: Int, t2: Int): Int = t1 + t2 def addInPlace(t1: Int, t2: Int): Int = t1 + t2
def zero(initialValue: Int) = 0 def zero(initialValue: Int) = 0
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment