diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index e89ac28b8eedfa111afc0f4219b8df48a0251e14..2ba871a6007d75444521420ab569589053e2a3e7 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -24,7 +24,7 @@ import scala.collection.generic.Growable import org.apache.spark.serializer.JavaSerializer /** - * A datatype that can be accumulated, i.e. has an commutative and associative "add" operation, + * A datatype that can be accumulated, ie has an commutative and associative "add" operation, * but where the result type, `R`, may be different from the element type being added, `T`. * * You must define how to add data, and how to merge two of these together. For some datatypes, @@ -185,7 +185,7 @@ class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Ser } /** - * A simpler value of [[org.apache.spark.Accumulable]] where the result type being accumulated is the same + * A simpler value of [[Accumulable]] where the result type being accumulated is the same * as the types of elements being merged. * * @param initialValue initial value of accumulator diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index c6b4ac5192d144db22cee3c6cbba2bd38a6e9acc..d7d10285dadcb49971f6a6404eeea6ac423d35f6 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -27,8 +27,8 @@ import org.apache.spark.rdd.RDD /** - * A future for the result of an action. This is an extension of the Scala Future interface to - * support cancellation. + * A future for the result of an action to support cancellation. This is an extension of the + * Scala Future interface to support cancellation. */ trait FutureAction[T] extends Future[T] { // Note that we redefine methods of the Future trait here explicitly so we can specify a different @@ -86,7 +86,7 @@ trait FutureAction[T] extends Future[T] { /** - * The future holding the result of an action that triggers a single job. Examples include + * A [[FutureAction]] holding the result of an action that triggers a single job. Examples include * count, collect, reduce. */ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T) @@ -150,7 +150,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: /** - * A FutureAction for actions that could trigger multiple Spark jobs. Examples include take, + * A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take, * takeSample. Cancellation works by setting the cancelled flag to true and interrupting the * action thread if it is being blocked by a job. */ diff --git a/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala b/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala index 56e0b8d2c0b9bb48766400e11446c0772b78c8f3..9b1601d5b95fa0930df091f15c9211379783ca11 100644 --- a/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala +++ b/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala @@ -19,7 +19,7 @@ package org.apache.spark /** * An iterator that wraps around an existing iterator to provide task killing functionality. - * It works by checking the interrupted flag in TaskContext. + * It works by checking the interrupted flag in [[TaskContext]]. */ class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T]) extends Iterator[T] { diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala index 9063cae87e14099e388b97bdffc6ee267c0f7832..b749e5414dab621172d2c81d6b66de0aee3cd135 100644 --- a/core/src/main/scala/org/apache/spark/Logging.scala +++ b/core/src/main/scala/org/apache/spark/Logging.scala @@ -122,7 +122,7 @@ trait Logging { } } -object Logging { +private object Logging { @volatile private var initialized = false val initLock = new Object() } diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala index 0fc478a41967c720f23da4f2d8a44d3ed26abd87..6bfe2cb4a29cf8783cab7fa4e9d789ceceb05b53 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong import org.apache.spark._ +private[spark] abstract class Broadcast[T](private[spark] val id: Long) extends Serializable { def value: T diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala index fb161ce69d40b160a81d4ad1e389d029a16b923a..940e5ab8051006baf42dd73b08594719c488a4d2 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala @@ -25,7 +25,7 @@ import org.apache.spark.SparkConf * BroadcastFactory implementation to instantiate a particular broadcast for the * entire Spark job. */ -private[spark] trait BroadcastFactory { +trait BroadcastFactory { def initialize(isDriver: Boolean, conf: SparkConf): Unit def newBroadcast[T](value: T, isLocal: Boolean, id: Long): Broadcast[T] def stop(): Unit diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 0eacda3d7dc2b4422bd0a46f1b54078c95c2a431..39ee0dbb92841da7d1508d757e737a2404c18fa7 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -63,7 +63,10 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea } } -private[spark] class HttpBroadcastFactory extends BroadcastFactory { +/** + * A [[BroadcastFactory]] implementation that uses a HTTP server as the broadcast medium. + */ +class HttpBroadcastFactory extends BroadcastFactory { def initialize(isDriver: Boolean, conf: SparkConf) { HttpBroadcast.initialize(isDriver, conf) } def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) = diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 1d295c62bcb6c2bb82b1685098641f0b823d11fb..d351dfc1f56a2d447085f01f061d9a11fbf03e71 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -236,8 +236,10 @@ private[spark] case class TorrentInfo( @transient var hasBlocks = 0 } -private[spark] class TorrentBroadcastFactory - extends BroadcastFactory { +/** + * A [[BroadcastFactory]] that creates a torrent-based implementation of broadcast. + */ +class TorrentBroadcastFactory extends BroadcastFactory { def initialize(isDriver: Boolean, conf: SparkConf) { TorrentBroadcast.initialize(isDriver, conf) } diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index e133893f6ca5bab05fea99d4319265fa4c43db74..9987e2300ceb75f8411653cdc7751ae88ee558ed 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -29,13 +29,12 @@ import org.apache.spark.{Logging, SparkConf} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.util.{AkkaUtils, Utils} -import akka.actor.Actor.emptyBehavior import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent} /** * Proxy that relays messages to the driver. */ -class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging { +private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging { var masterActor: ActorSelection = _ val timeout = AkkaUtils.askTimeout(conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index 7507bf8ad0e6c56b8f09381066ed5168add24d24..cf6a23339d961ceb79be825949835311872689cb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -10,8 +10,9 @@ import org.apache.spark.util.Utils /** ** Utilities for running commands with the spark classpath. */ +private[spark] object CommandUtils extends Logging { - private[spark] def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = { + def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = { val runner = getEnv("JAVA_HOME", command).map(_ + "/bin/java").getOrElse("java") // SPARK-698: do not call the run.cmd script, as process.destroy() diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala index 70a5a8caff839926704e3a2faf2c2a024963bf23..2625a7f6a575adc19ed23cddda2f6793627c3b53 100644 --- a/core/src/main/scala/org/apache/spark/package.scala +++ b/core/src/main/scala/org/apache/spark/package.scala @@ -29,6 +29,9 @@ package org.apache * be saved as SequenceFiles. These operations are automatically available on any RDD of the right * type (e.g. RDD[(Int, Int)] through implicit conversions when you * `import org.apache.spark.SparkContext._`. + * + * Java programmers should reference the [[spark.api.java]] package + * for Spark programming APIs in Java. */ package object spark { // For package docs only diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index d4f396afb5d2bae34acd2c600c32f44118ed7056..8ef919c4b58cbed1e2704a56a5082578ab092c46 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -27,7 +27,6 @@ import scala.io.Source import scala.reflect.ClassTag import org.apache.spark.{SparkEnv, Partition, TaskContext} -import org.apache.spark.broadcast.Broadcast /** @@ -113,7 +112,7 @@ class PipedRDD[T: ClassTag]( } } -object PipedRDD { +private object PipedRDD { // Split a string into words using a standard StringTokenizer def tokenize(command: String): Seq[String] = { val buf = new ArrayBuffer[String] diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 55a40a92c96521ebdeece352ebf65324cbeee59f..d8e97c3b7c7b0284fa8afa225b1375236322cf9e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler import java.util.Properties import org.apache.spark.util.{Utils, Distribution} -import org.apache.spark.{Logging, SparkContext, TaskEndReason} +import org.apache.spark.{Logging, TaskEndReason} import org.apache.spark.executor.TaskMetrics sealed trait SparkListenerEvents @@ -27,7 +27,7 @@ sealed trait SparkListenerEvents case class SparkListenerStageSubmitted(stage: StageInfo, properties: Properties) extends SparkListenerEvents -case class SparkListenerStageCompleted(val stage: StageInfo) extends SparkListenerEvents +case class SparkListenerStageCompleted(stage: StageInfo) extends SparkListenerEvents case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents @@ -46,6 +46,9 @@ case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult) /** An event used in the listener to shutdown the listener daemon thread. */ private[scheduler] case object SparkListenerShutdown extends SparkListenerEvents +/** + * Interface for listening to events from the Spark scheduler. + */ trait SparkListener { /** * Called when a stage is completed, with information on the completed stage @@ -115,7 +118,7 @@ class StatsReportListener extends SparkListener with Logging { } -object StatsReportListener extends Logging { +private[spark] object StatsReportListener extends Logging { //for profiling, the extremes are more interesting val percentiles = Array[Int](0,5,10,25,50,75,90,95,100) @@ -202,9 +205,9 @@ object StatsReportListener extends Logging { } } +private case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double) -case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double) -object RuntimePercentage { +private object RuntimePercentage { def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = { val denom = totalTime.toDouble val fetchTime = metrics.shuffleReadMetrics.map{_.fetchWaitTime} diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 369a277232b19ca28ce7a4aee1f5779afddb79e6..48cec4be4111cbd519cd50d3ab020a0b9c4b7777 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -32,7 +32,7 @@ import org.apache.spark.serializer.{SerializationStream, Serializer} * * This interface does not support concurrent writes. */ -abstract class BlockObjectWriter(val blockId: BlockId) { +private[spark] abstract class BlockObjectWriter(val blockId: BlockId) { def open(): BlockObjectWriter @@ -69,7 +69,7 @@ abstract class BlockObjectWriter(val blockId: BlockId) { } /** BlockObjectWriter which writes directly to a file on disk. Appends to the given file. */ -class DiskBlockObjectWriter( +private[spark] class DiskBlockObjectWriter( blockId: BlockId, file: File, serializer: Serializer, diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 0f84810d6be06cc106e44aa5219e902801d959f0..1b7934d59fa1da9a9943aa5dad13247a0648224b 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -108,6 +108,10 @@ class StorageLevel private( } +/** + * Various [[org.apache.spark.storage.StorageLevel]] defined and utility functions for creating + * new storage levels. + */ object StorageLevel { val NONE = new StorageLevel(false, false, false) val DISK_ONLY = new StorageLevel(true, false, false) diff --git a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala index dc15a38b29d702282986266b6674138b5c670c39..fcc1ca9502aa141217a45d9a0b217697f4c95714 100644 --- a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala +++ b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala @@ -18,14 +18,15 @@ package org.apache.spark.util /** - * Wrapper around an iterator which calls a completion method after it successfully iterates through all the elements + * Wrapper around an iterator which calls a completion method after it successfully iterates + * through all the elements. */ -abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{ - def next = sub.next +private[spark] abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{ + def next() = sub.next() def hasNext = { val r = sub.hasNext if (!r) { - completion + completion() } r } @@ -33,7 +34,7 @@ abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterato def completion() } -object CompletionIterator { +private[spark] object CompletionIterator { def apply[A, I <: Iterator[A]](sub: I, completionFunction: => Unit) : CompletionIterator[A,I] = { new CompletionIterator[A,I](sub) { def completion() = completionFunction diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala index ac07a55cb9101a1d867af68e97eac25451c7ca79..b0febe906ade32e5a9a7a805e47ee4a44816eb0a 100644 --- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala @@ -18,13 +18,13 @@ package org.apache.spark.util import java.util.{TimerTask, Timer} -import org.apache.spark.{SparkConf, SparkContext, Logging} +import org.apache.spark.{SparkConf, Logging} /** * Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries) */ -class MetadataCleaner( +private[spark] class MetadataCleaner( cleanerType: MetadataCleanerType.MetadataCleanerType, cleanupFunc: (Long) => Unit, conf: SparkConf) @@ -60,7 +60,7 @@ class MetadataCleaner( } } -object MetadataCleanerType extends Enumeration { +private[spark] object MetadataCleanerType extends Enumeration { val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK, SHUFFLE_MAP_TASK, BLOCK_MANAGER, SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value @@ -72,7 +72,7 @@ object MetadataCleanerType extends Enumeration { // TODO: This mutates a Conf to set properties right now, which is kind of ugly when used in the // initialization of StreamingContext. It's okay for users trying to configure stuff themselves. -object MetadataCleaner { +private[spark] object MetadataCleaner { def getDelaySeconds(conf: SparkConf) = { conf.getInt("spark.cleaner.ttl", -1) } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index c8b5f09ab5f75f2cbef015cb3fdf9cd8e9852c01..d4e06dd2a10a1c00e41851ea48e4ebe2d04e2b25 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -136,6 +136,13 @@ object SparkBuild extends Build { javaOptions += "-Xmx3g", // Show full stack trace and duration in test cases. testOptions in Test += Tests.Argument("-oDF"), + // Remove certain packages from Scaladoc + scalacOptions in (Compile,doc) := Seq("-skip-packages", Seq( + "akka", + "org.apache.spark.network", + "org.apache.spark.deploy", + "org.apache.spark.util.collection" + ).mkString(":")), // Only allow one test at a time, even across projects, since they run in the same JVM concurrentRestrictions in Global += Tags.limit(Tags.Test, 1), diff --git a/core/src/main/scala/org/apache/spark/util/RateLimitedOutputStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala similarity index 98% rename from core/src/main/scala/org/apache/spark/util/RateLimitedOutputStream.scala rename to streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala index 47e1b450043b32b8d9178dc6dce40a9768c318d7..b9c0596378b4fbc7d4de1377a37ff60547860f8c 100644 --- a/core/src/main/scala/org/apache/spark/util/RateLimitedOutputStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.util +package org.apache.spark.streaming.util import scala.annotation.tailrec diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala index 6585d494a6f86e04503a0c4c85283aa3031e6462..463617a713b22f03125905614bd153861c405c15 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala @@ -17,14 +17,17 @@ package org.apache.spark.streaming.util -import java.nio.ByteBuffer -import org.apache.spark.util.{RateLimitedOutputStream, IntParam} +import java.io.IOException import java.net.ServerSocket -import org.apache.spark.{SparkConf, Logging} -import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream +import java.nio.ByteBuffer + import scala.io.Source -import java.io.IOException + +import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream + +import org.apache.spark.{SparkConf, Logging} import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.util.IntParam /** * A helper program that sends blocks of Kryo-serialized text strings out on a socket at a diff --git a/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala similarity index 97% rename from core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala rename to streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala index a9dd0b1a5b61554b8bf8eb0d5f22bfc9156707fa..15f13d5b199462bff64eb64bebf197da9f651add 100644 --- a/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.util +package org.apache.spark.streaming.util import org.scalatest.FunSuite import java.io.ByteArrayOutputStream