diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 31b0773bfe06c6ca4cebc75e4a13430857940dd9..9b043f06dd7349d159ce2ae7d15fae9cefea39c6 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -61,7 +61,8 @@ object Partitioner { } /** - * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using Java's `Object.hashCode`. + * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using + * Java's `Object.hashCode`. * * Java arrays have hashCodes that are based on the arrays' identities rather than their contents, * so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will @@ -84,8 +85,8 @@ class HashPartitioner(partitions: Int) extends Partitioner { } /** - * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly equal ranges. - * Determines the ranges by sampling the RDD passed in. + * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly + * equal ranges. The ranges are determined by sampling the content of the RDD passed in. */ class RangePartitioner[K <% Ordered[K]: ClassTag, V]( partitions: Int, diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0e47f4e442927418cefd5cbc316d1ede06f93933..47574ab34cdacbf5f77e0e3314f16d935643222f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -31,9 +31,9 @@ import scala.reflect.{ClassTag, classTag} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, -FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable} + FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable} import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, -TextInputFormat} + TextInputFormat} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} import org.apache.mesos.MesosNativeLibrary @@ -49,7 +49,7 @@ import org.apache.spark.scheduler.local.LocalBackend import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils} import org.apache.spark.ui.SparkUI import org.apache.spark.util.{Utils, TimeStampedHashMap, MetadataCleaner, MetadataCleanerType, -ClosureCleaner} + ClosureCleaner} /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -517,15 +517,15 @@ class SparkContext( // Methods for creating shared variables /** - * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" values - * to using the `+=` method. Only the driver can access the accumulator's `value`. + * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" + * values to using the `+=` method. Only the driver can access the accumulator's `value`. */ def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) = new Accumulator(initialValue, param) /** - * Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values with `+=`. - * Only the driver can access the accumuable's `value`. + * Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values + * with `+=`. Only the driver can access the accumuable's `value`. * @tparam T accumulator type * @tparam R type that can be added to the accumulator */ @@ -538,13 +538,15 @@ class SparkContext( * Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by * standard mutable collections. So you can use this with mutable Map, Set, etc. */ - def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable, T](initialValue: R) = { + def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable, T] + (initialValue: R) = { val param = new GrowableAccumulableParam[R,T] new Accumulable(initialValue, param) } /** - * Broadcast a read-only variable to the cluster, returning a [[org.apache.spark.broadcast.Broadcast]] object for + * Broadcast a read-only variable to the cluster, returning a + * [[org.apache.spark.broadcast.Broadcast]] object for * reading it in distributed functions. The variable will be sent to each cluster only once. */ def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T](value, isLocal) @@ -1010,7 +1012,8 @@ object SparkContext { implicit def stringToText(s: String) = new Text(s) - private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T]): ArrayWritable = { + private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T]) + : ArrayWritable = { def anyToWritable[U <% Writable](u: U): Writable = u new ArrayWritable(classTag[T].runtimeClass.asInstanceOf[Class[Writable]], @@ -1033,7 +1036,9 @@ object SparkContext { implicit def booleanWritableConverter() = simpleWritableConverter[Boolean, BooleanWritable](_.get) - implicit def bytesWritableConverter() = simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes) + implicit def bytesWritableConverter() = { + simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes) + } implicit def stringWritableConverter() = simpleWritableConverter[String, Text](_.toString) @@ -1049,7 +1054,8 @@ object SparkContext { if (uri != null) { val uriStr = uri.toString if (uriStr.startsWith("jar:file:")) { - // URI will be of the form "jar:file:/path/foo.jar!/package/cls.class", so pull out the /path/foo.jar + // URI will be of the form "jar:file:/path/foo.jar!/package/cls.class", + // so pull out the /path/foo.jar List(uriStr.substring("jar:file:".length, uriStr.indexOf('!'))) } else { Nil diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index a0809cd43a6fe492bd99524abead4041ca2731e5..c118ddfc0138f86bd300344e4d239150d3d0c7f8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -18,35 +18,34 @@ package org.apache.spark.rdd import java.nio.ByteBuffer -import java.util.Date import java.text.SimpleDateFormat +import java.util.Date import java.util.{HashMap => JHashMap} -import scala.collection.{mutable, Map} +import scala.collection.Map +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import scala.reflect.{ClassTag, classTag} -import org.apache.hadoop.mapred._ -import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.SequenceFile.CompressionType -import org.apache.hadoop.mapred.FileOutputFormat -import org.apache.hadoop.mapred.OutputFormat +import org.apache.hadoop.io.compress.CompressionCodec +import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} -import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} -import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob} import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} import com.clearspring.analytics.stream.cardinality.HyperLogLog +// SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark. +import org.apache.hadoop.mapred.SparkHadoopWriter +import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.partial.{BoundedDouble, PartialResult} -import org.apache.spark.Aggregator -import org.apache.spark.Partitioner import org.apache.spark.Partitioner.defaultPartitioner import org.apache.spark.util.SerializableHyperLogLog @@ -120,9 +119,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) } /** - * Merge the values for each key using an associative function and a neutral "zero value" which may - * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for - * list concatenation, 0 for addition, or 1 for multiplication.). + * Merge the values for each key using an associative function and a neutral "zero value" which + * may be added to the result an arbitrary number of times, and must not change the result + * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.). */ def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = { // Serialize the zero value to a byte array so that we can get a new clone of it on each key @@ -138,18 +137,18 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) } /** - * Merge the values for each key using an associative function and a neutral "zero value" which may - * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for - * list concatenation, 0 for addition, or 1 for multiplication.). + * Merge the values for each key using an associative function and a neutral "zero value" which + * may be added to the result an arbitrary number of times, and must not change the result + * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.). */ def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = { foldByKey(zeroValue, new HashPartitioner(numPartitions))(func) } /** - * Merge the values for each key using an associative function and a neutral "zero value" which may - * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for - * list concatenation, 0 for addition, or 1 for multiplication.). + * Merge the values for each key using an associative function and a neutral "zero value" which + * may be added to the result an arbitrary number of times, and must not change the result + * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.). */ def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = { foldByKey(zeroValue, defaultPartitioner(self))(func) @@ -226,7 +225,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) } /** - * Return approximate number of distinct values for each key in this RDD. + * Return approximate number of distinct values for each key in this RDD. * The accuracy of approximation can be controlled through the relative standard deviation * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in * more accurate counts but increase the memory footprint and vise versa. HashPartitions the @@ -579,7 +578,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) */ def saveAsHadoopFile[F <: OutputFormat[K, V]]( path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassTag[F]) { - saveAsHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]], codec) + val runtimeClass = fm.runtimeClass + saveAsHadoopFile(path, getKeyClass, getValueClass, runtimeClass.asInstanceOf[Class[F]], codec) } /** @@ -599,7 +599,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], - conf: Configuration = self.context.hadoopConfiguration) { + conf: Configuration = self.context.hadoopConfiguration) + { val job = new NewAPIHadoopJob(conf) job.setOutputKeyClass(keyClass) job.setOutputValueClass(valueClass) @@ -668,7 +669,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) codec: Option[Class[_ <: CompressionCodec]] = None) { conf.setOutputKeyClass(keyClass) conf.setOutputValueClass(valueClass) - // conf.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9 due to what may be a generics bug + // Doesn't work in Scala 2.9 due to what may be a generics bug + // TODO: Should we uncomment this for Scala 2.10? + // conf.setOutputFormat(outputFormatClass) conf.set("mapred.output.format.class", outputFormatClass.getName) for (c <- codec) { conf.setCompressMapOutput(true) @@ -702,7 +705,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) throw new SparkException("Output value class not set") } - logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")") + logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + + valueClass.getSimpleName+ ")") val writer = new SparkHadoopWriter(conf) writer.preSetup() diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 3f41b662799873694958b2b699563c0627cab499..f9dc12eee3291ffaaa5a9290176039c4d14a5242 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -23,7 +23,6 @@ import scala.collection.Map import scala.collection.JavaConversions.mapAsScalaMap import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap import scala.reflect.{classTag, ClassTag} import org.apache.hadoop.io.BytesWritable @@ -52,11 +51,13 @@ import org.apache.spark._ * partitioned collection of elements that can be operated on in parallel. This class contains the * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition, * [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value - * pairs, such as `groupByKey` and `join`; [[org.apache.spark.rdd.DoubleRDDFunctions]] contains - * operations available only on RDDs of Doubles; and [[org.apache.spark.rdd.SequenceFileRDDFunctions]] - * contains operations available on RDDs that can be saved as SequenceFiles. These operations are - * automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit - * conversions when you `import org.apache.spark.SparkContext._`. + * pairs, such as `groupByKey` and `join`; + * [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of + * Doubles; and + * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that + * can be saved as SequenceFiles. + * These operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] + * through implicit conversions when you `import org.apache.spark.SparkContext._`. * * Internally, each RDD is characterized by five main properties: * @@ -235,12 +236,9 @@ abstract class RDD[T: ClassTag]( /** * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing. */ - private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = { - if (isCheckpointed) { - firstParent[T].iterator(split, context) - } else { - compute(split, context) - } + private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = + { + if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context) } // Transformations (return a new RDD) @@ -268,6 +266,9 @@ abstract class RDD[T: ClassTag]( def distinct(numPartitions: Int): RDD[T] = map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) + /** + * Return a new RDD containing the distinct elements in this RDD. + */ def distinct(): RDD[T] = distinct(partitions.size) /** @@ -280,7 +281,7 @@ abstract class RDD[T: ClassTag]( * which can avoid performing a shuffle. */ def repartition(numPartitions: Int): RDD[T] = { - coalesce(numPartitions, true) + coalesce(numPartitions, shuffle = true) } /** @@ -646,7 +647,8 @@ abstract class RDD[T: ClassTag]( } /** - * Reduces the elements of this RDD using the specified commutative and associative binary operator. + * Reduces the elements of this RDD using the specified commutative and + * associative binary operator. */ def reduce(f: (T, T) => T): T = { val cleanF = sc.clean(f) @@ -953,7 +955,7 @@ abstract class RDD[T: ClassTag]( private var storageLevel: StorageLevel = StorageLevel.NONE /** Record user function generating this RDD. */ - @transient private[spark] val origin = sc.getCallSite + @transient private[spark] val origin = sc.getCallSite() private[spark] def elementClassTag: ClassTag[T] = classTag[T]