Skip to content
Snippets Groups Projects
Unverified Commit f7a574a6 authored by Aliaksandr.Bedrytski's avatar Aliaksandr.Bedrytski Committed by Sean Owen
Browse files

[SPARK-18708][CORE] Improvement/improve docs in spark context file

## What changes were proposed in this pull request?

SparkContext.scala was created a long time ago and contains several types of Scaladocs/Javadocs mixed together. Public methods/fields should have a Scaladoc that is formatted in the same way everywhere. This pull request also adds scaladoc to methods/fields that did not have it before.

## How was this patch tested?

No actual code was modified, only comments.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Aliaksandr.Bedrytski <aliaksandr.bedrytski@valtech.co.uk>

Closes #16137 from Mironor/improvement/improve-docs-in-spark-context-file.
parent 836c95b1
No related branches found
No related tags found
No related merge requests found
......@@ -673,10 +673,10 @@ class SparkContext(config: SparkConf) extends Logging {
* sc.cancelJobGroup("some_job_to_cancel")
* }}}
*
* If interruptOnCancel is set to true for the job group, then job cancellation will result
* in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
* that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
* where HDFS may respond to Thread.interrupt() by marking nodes as dead.
* @param interruptOnCancel If true, then job cancellation will result in `Thread.interrupt()`
* being called on the job's executor threads. This is useful to help ensure that the tasks
* are actually stopped in a timely manner, but is off by default due to HDFS-1208, where HDFS
* may respond to Thread.interrupt() by marking nodes as dead.
*/
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false) {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, description)
......@@ -712,6 +712,9 @@ class SparkContext(config: SparkConf) extends Logging {
* modified collection. Pass a copy of the argument to avoid this.
* @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
* RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
* @param seq Scala collection to distribute
* @param numSlices number of partitions to divide the collection into
* @return RDD representing distributed collection
*/
def parallelize[T: ClassTag](
seq: Seq[T],
......@@ -729,8 +732,8 @@ class SparkContext(config: SparkConf) extends Logging {
* @param start the start value.
* @param end the end value.
* @param step the incremental step
* @param numSlices the partition number of the new RDD.
* @return
* @param numSlices number of partitions to divide the collection into
* @return RDD representing distributed range
*/
def range(
start: Long,
......@@ -795,6 +798,9 @@ class SparkContext(config: SparkConf) extends Logging {
/** Distribute a local Scala collection to form an RDD.
*
* This method is identical to `parallelize`.
* @param seq Scala collection to distribute
* @param numSlices number of partitions to divide the collection into
* @return RDD representing distributed collection
*/
def makeRDD[T: ClassTag](
seq: Seq[T],
......@@ -806,6 +812,8 @@ class SparkContext(config: SparkConf) extends Logging {
* Distribute a local Scala collection to form an RDD, with one or more
* location preferences (hostnames of Spark nodes) for each object.
* Create a new partition for each collection item.
* @param seq list of tuples of data and location preferences (hostnames of Spark nodes)
* @return RDD representing data partitioned according to location preferences
*/
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
assertNotStopped()
......@@ -816,6 +824,9 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
* @param path path to the text file on a supported file system
* @param minPartitions suggested minimum number of partitions for the resulting RDD
* @return RDD of lines of the text file
*/
def textFile(
path: String,
......@@ -857,6 +868,7 @@ class SparkContext(config: SparkConf) extends Logging {
* @param path Directory to the input data files, the path can be comma separated paths as the
* list of inputs.
* @param minPartitions A suggestion value of the minimal splitting number for input data.
* @return RDD representing tuples of file path and the corresponding file content
*/
def wholeTextFiles(
path: String,
......@@ -908,6 +920,7 @@ class SparkContext(config: SparkConf) extends Logging {
* @param path Directory to the input data files, the path can be comma separated paths as the
* list of inputs.
* @param minPartitions A suggestion value of the minimal splitting number for input data.
* @return RDD representing tuples of file path and corresponding file content
*/
def binaryFiles(
path: String,
......@@ -968,10 +981,11 @@ class SparkContext(config: SparkConf) extends Logging {
* Therefore if you plan to reuse this conf to create multiple RDDs, you need to make
* sure you won't modify the conf. A safe approach is always creating a new conf for
* a new RDD.
* @param inputFormatClass Class of the InputFormat
* @param keyClass Class of the keys
* @param valueClass Class of the values
* @param inputFormatClass storage format of the data to be read
* @param keyClass `Class` of the key associated with the `inputFormatClass` parameter
* @param valueClass `Class` of the value associated with the `inputFormatClass` parameter
* @param minPartitions Minimum number of Hadoop Splits to generate.
* @return RDD of tuples of key and corresponding value
*
* @note Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
......@@ -1003,6 +1017,13 @@ class SparkContext(config: SparkConf) extends Logging {
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
* @param path directory to the input data files, the path can be comma separated paths
* as a list of inputs
* @param inputFormatClass storage format of the data to be read
* @param keyClass `Class` of the key associated with the `inputFormatClass` parameter
* @param valueClass `Class` of the value associated with the `inputFormatClass` parameter
* @param minPartitions suggested minimum number of partitions for the resulting RDD
* @return RDD of tuples of key and corresponding value
*/
def hadoopFile[K, V](
path: String,
......@@ -1042,6 +1063,10 @@ class SparkContext(config: SparkConf) extends Logging {
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
* @param path directory to the input data files, the path can be comma separated paths
* as a list of inputs
* @param minPartitions suggested minimum number of partitions for the resulting RDD
* @return RDD of tuples of key and corresponding value
*/
def hadoopFile[K, V, F <: InputFormat[K, V]]
(path: String, minPartitions: Int)
......@@ -1066,13 +1091,32 @@ class SparkContext(config: SparkConf) extends Logging {
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
* @param path directory to the input data files, the path can be comma separated paths as
* a list of inputs
* @return RDD of tuples of key and corresponding value
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope {
hadoopFile[K, V, F](path, defaultMinPartitions)
}
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
/**
* Smarter version of `newApiHadoopFile` that uses class tags to figure out the classes of keys,
* values and the `org.apache.hadoop.mapreduce.InputFormat` (new MapReduce API) so that user
* don't need to pass them directly. Instead, callers can just write, for example:
* ```
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
* ```
*
* @note Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
* @param path directory to the input data files, the path can be comma separated paths
* as a list of inputs
* @return RDD of tuples of key and corresponding value
*/
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
(path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope {
......@@ -1092,6 +1136,13 @@ class SparkContext(config: SparkConf) extends Logging {
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
* @param path directory to the input data files, the path can be comma separated paths
* as a list of inputs
* @param fClass storage format of the data to be read
* @param kClass `Class` of the key associated with the `fClass` parameter
* @param vClass `Class` of the value associated with the `fClass` parameter
* @param conf Hadoop configuration
* @return RDD of tuples of key and corresponding value
*/
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
path: String,
......@@ -1123,9 +1174,9 @@ class SparkContext(config: SparkConf) extends Logging {
* Therefore if you plan to reuse this conf to create multiple RDDs, you need to make
* sure you won't modify the conf. A safe approach is always creating a new conf for
* a new RDD.
* @param fClass Class of the InputFormat
* @param kClass Class of the keys
* @param vClass Class of the values
* @param fClass storage format of the data to be read
* @param kClass `Class` of the key associated with the `fClass` parameter
* @param vClass `Class` of the value associated with the `fClass` parameter
*
* @note Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
......@@ -1158,6 +1209,12 @@ class SparkContext(config: SparkConf) extends Logging {
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
* @param path directory to the input data files, the path can be comma separated paths
* as a list of inputs
* @param keyClass `Class` of the key associated with `SequenceFileInputFormat`
* @param valueClass `Class` of the value associated with `SequenceFileInputFormat`
* @param minPartitions suggested minimum number of partitions for the resulting RDD
* @return RDD of tuples of key and corresponding value
*/
def sequenceFile[K, V](path: String,
keyClass: Class[K],
......@@ -1177,6 +1234,11 @@ class SparkContext(config: SparkConf) extends Logging {
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
* @param path directory to the input data files, the path can be comma separated paths
* as a list of inputs
* @param keyClass `Class` of the key associated with `SequenceFileInputFormat`
* @param valueClass `Class` of the value associated with `SequenceFileInputFormat`
* @return RDD of tuples of key and corresponding value
*/
def sequenceFile[K, V](
path: String,
......@@ -1207,6 +1269,10 @@ class SparkContext(config: SparkConf) extends Logging {
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
* @param path directory to the input data files, the path can be comma separated paths
* as a list of inputs
* @param minPartitions suggested minimum number of partitions for the resulting RDD
* @return RDD of tuples of key and corresponding value
*/
def sequenceFile[K, V]
(path: String, minPartitions: Int = defaultMinPartitions)
......@@ -1231,6 +1297,11 @@ class SparkContext(config: SparkConf) extends Logging {
* be pretty slow if you use the default serializer (Java serialization),
* though the nice thing about it is that there's very little effort required to save arbitrary
* objects.
*
* @param path directory to the input data files, the path can be comma separated paths
* as a list of inputs
* @param minPartitions suggested minimum number of partitions for the resulting RDD
* @return RDD representing deserialized data from the file(s)
*/
def objectFile[T: ClassTag](
path: String,
......@@ -1410,6 +1481,9 @@ class SparkContext(config: SparkConf) extends Logging {
* Broadcast a read-only variable to the cluster, returning a
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
* The variable will be sent to each cluster only once.
*
* @param value value to broadcast to the Spark nodes
* @return `Broadcast` object, a read-only variable cached on each machine
*/
def broadcast[T: ClassTag](value: T): Broadcast[T] = {
assertNotStopped()
......@@ -1424,8 +1498,9 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Add a file to be downloaded with this Spark job on every node.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
*
* @param path can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
* use `SparkFiles.get(fileName)` to find its download location.
*/
def addFile(path: String): Unit = {
......@@ -1439,12 +1514,12 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Add a file to be downloaded with this Spark job on every node.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
* use `SparkFiles.get(fileName)` to find its download location.
*
* A directory can be given if the recursive option is set to true. Currently directories are only
* supported for Hadoop-supported filesystems.
* @param path can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
* use `SparkFiles.get(fileName)` to find its download location.
* @param recursive if true, a directory can be given in `path`. Currently directories are
* only supported for Hadoop-supported filesystems.
*/
def addFile(path: String, recursive: Boolean): Unit = {
val uri = new Path(path).toUri
......@@ -1715,9 +1790,9 @@ class SparkContext(config: SparkConf) extends Logging {
}
/**
* Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
* Adds a JAR dependency for all tasks to be executed on this `SparkContext` in the future.
* @param path can be either a local file, a file in HDFS (or other Hadoop-supported filesystems),
* an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
*/
def addJar(path: String) {
if (path == null) {
......@@ -1907,6 +1982,12 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Run a function on a given set of partitions in an RDD and pass the results to the given
* handler function. This is the main entry point for all actions in Spark.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like `first()`
* @param resultHandler callback to pass each result to
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
......@@ -1929,6 +2010,14 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Run a function on a given set of partitions in an RDD and return the results as an array.
* The function that is run against each partition additionally takes `TaskContext` argument.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like `first()`
* @return in-memory collection with a result of the job (each collection element will contain
* a result from one partition)
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
......@@ -1940,8 +2029,14 @@ class SparkContext(config: SparkConf) extends Logging {
}
/**
* Run a job on a given set of partitions of an RDD, but take a function of type
* `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
* Run a function on a given set of partitions in an RDD and return the results as an array.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like `first()`
* @return in-memory collection with a result of the job (each collection element will contain
* a result from one partition)
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
......@@ -1952,7 +2047,13 @@ class SparkContext(config: SparkConf) extends Logging {
}
/**
* Run a job on all partitions in an RDD and return the results in an array.
* Run a job on all partitions in an RDD and return the results in an array. The function
* that is run against each partition additionally takes `TaskContext` argument.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @return in-memory collection with a result of the job (each collection element will contain
* a result from one partition)
*/
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
......@@ -1960,13 +2061,23 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Run a job on all partitions in an RDD and return the results in an array.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @return in-memory collection with a result of the job (each collection element will contain
* a result from one partition)
*/
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
}
/**
* Run a job on all partitions in an RDD and pass the results to a handler function.
* Run a job on all partitions in an RDD and pass the results to a handler function. The function
* that is run against each partition additionally takes `TaskContext` argument.
*
* @param rdd target RDD to run tasks on
* @param processPartition a function to run on each partition of the RDD
* @param resultHandler callback to pass each result to
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
......@@ -1978,6 +2089,10 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Run a job on all partitions in an RDD and pass the results to a handler function.
*
* @param rdd target RDD to run tasks on
* @param processPartition a function to run on each partition of the RDD
* @param resultHandler callback to pass each result to
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
......@@ -1991,6 +2106,13 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* :: DeveloperApi ::
* Run a job that can return approximate results.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param evaluator `ApproximateEvaluator` to receive the partial results
* @param timeout maximum time to wait for the job, in milliseconds
* @return partial result (how partial depends on whether the job was finished before or
* after timeout)
*/
@DeveloperApi
def runApproximateJob[T, U, R](
......@@ -2012,6 +2134,13 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Submit a job for execution and return a FutureJob holding the result.
*
* @param rdd target RDD to run tasks on
* @param processPartition a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like `first()`
* @param resultHandler callback to pass each result to
* @param resultFunc function to be executed when the result is ready
*/
def submitJob[T, U, R](
rdd: RDD[T],
......@@ -2096,6 +2225,7 @@ class SparkContext(config: SparkConf) extends Logging {
* @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability
* @throws SparkException if <tt>checkSerializable</tt> is set but <tt>f</tt> is not
* serializable
* @return the cleaned closure
*/
private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
ClosureCleaner.clean(f, checkSerializable)
......@@ -2103,8 +2233,9 @@ class SparkContext(config: SparkConf) extends Logging {
}
/**
* Set the directory under which RDDs are going to be checkpointed. The directory must
* be a HDFS path if running on a cluster.
* Set the directory under which RDDs are going to be checkpointed.
* @param directory path to the directory where checkpoint files will be stored
* (must be HDFS path if running in cluster)
*/
def setCheckpointDir(directory: String) {
......@@ -2311,6 +2442,8 @@ object SparkContext extends Logging {
*
* @note This function cannot be used to create multiple SparkContext instances
* even if multiple contexts are allowed.
* @param config `SparkConfig` that will be used for initialisation of the `SparkContext`
* @return current `SparkContext` (or a new one if it wasn't created before the function call)
*/
def getOrCreate(config: SparkConf): SparkContext = {
// Synchronize to ensure that multiple create requests don't trigger an exception
......@@ -2336,6 +2469,7 @@ object SparkContext extends Logging {
*
* @note This function cannot be used to create multiple SparkContext instances
* even if multiple contexts are allowed.
* @return current `SparkContext` (or a new one if wasn't created before the function call)
*/
def getOrCreate(): SparkContext = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
......@@ -2416,6 +2550,9 @@ object SparkContext extends Logging {
/**
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to SparkContext.
*
* @param cls class that should be inside of the jar
* @return jar that contains the Class, `None` if not found
*/
def jarOfClass(cls: Class[_]): Option[String] = {
val uri = cls.getResource("/" + cls.getName.replace('.', '/') + ".class")
......@@ -2437,6 +2574,9 @@ object SparkContext extends Logging {
* Find the JAR that contains the class of a particular object, to make it easy for users
* to pass their JARs to SparkContext. In most cases you can call jarOfObject(this) in
* your driver program.
*
* @param obj reference to an instance which class should be inside of the jar
* @return jar that contains the class of the instance, `None` if not found
*/
def jarOfObject(obj: AnyRef): Option[String] = jarOfClass(obj.getClass)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment