From 4a6e78abd9d5edc4a5092738dff0006bbe202a89 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun <dongjoon@apache.org> Date: Sat, 2 Apr 2016 17:50:40 -0700 Subject: [PATCH] [MINOR][DOCS] Use multi-line JavaDoc comments in Scala code. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What changes were proposed in this pull request? This PR aims to fix all Scala-Style multiline comments into Java-Style multiline comments in Scala codes. (All comment-only changes over 77 files: +786 lines, −747 lines) ## How was this patch tested? Manual. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #12130 from dongjoon-hyun/use_multiine_javadoc_comments. --- .../scala/org/apache/spark/FutureAction.scala | 14 +- .../scala/org/apache/spark/SSLOptions.scala | 57 +++--- .../scala/org/apache/spark/SparkContext.scala | 42 ++-- .../apache/spark/api/java/JavaPairRDD.scala | 8 +- .../spark/api/java/JavaSparkContext.scala | 60 +++--- .../spark/deploy/worker/CommandUtils.scala | 2 +- .../org/apache/spark/rdd/CoGroupedRDD.scala | 10 +- .../main/scala/org/apache/spark/rdd/RDD.scala | 4 +- .../mesos/CoarseMesosSchedulerBackend.scala | 24 +-- .../cluster/mesos/MesosClusterScheduler.scala | 12 +- .../cluster/mesos/MesosSchedulerUtils.scala | 4 +- .../apache/spark/shuffle/ShuffleManager.scala | 6 +- .../spark/storage/memory/MemoryStore.scala | 20 +- .../scala/org/apache/spark/util/Utils.scala | 17 +- .../test/scala/org/apache/spark/Smuggle.scala | 46 ++--- .../spark/memory/MemoryManagerSuite.scala | 24 +-- .../apache/spark/examples/BroadcastTest.scala | 4 +- .../spark/examples/DFSReadWriteTest.scala | 20 +- .../apache/spark/examples/GroupByTest.scala | 4 +- .../spark/examples/MultiBroadcastTest.scala | 4 +- .../examples/SimpleSkewedGroupByTest.scala | 4 +- .../spark/examples/SkewedGroupByTest.scala | 4 +- .../clickstream/PageViewGenerator.scala | 23 ++- .../clickstream/PageViewStream.scala | 21 +- .../streaming/flume/FlumeInputDStream.scala | 15 +- .../streaming/kafka/KafkaRDDPartition.scala | 15 +- .../org/apache/spark/graphx/GraphOps.scala | 10 +- .../graphx/lib/ConnectedComponents.scala | 18 +- .../spark/ml/feature/ElementwiseProduct.scala | 6 +- .../python/GaussianMixtureModelWrapper.scala | 8 +- .../api/python/Word2VecModelWrapper.scala | 4 +- .../apache/spark/mllib/linalg/Matrices.scala | 16 +- .../StreamingLinearRegressionWithSGD.scala | 4 +- .../org/apache/spark/repl/SparkILoop.scala | 21 +- .../org/apache/spark/repl/SparkImports.scala | 5 +- .../scala/org/apache/spark/sql/Encoder.scala | 24 +-- .../sql/catalyst/analysis/Analyzer.scala | 20 +- .../sql/catalyst/expressions/Projection.scala | 6 +- .../expressions/codegen/CodeGenerator.scala | 26 +-- .../sql/catalyst/expressions/grouping.scala | 18 +- .../spark/sql/catalyst/expressions/misc.scala | 4 +- .../sql/catalyst/optimizer/Optimizer.scala | 40 ++-- .../sql/catalyst/parser/AstBuilder.scala | 4 +- .../sql/catalyst/planning/patterns.scala | 28 +-- .../spark/sql/catalyst/plans/QueryPlan.scala | 4 +- .../plans/physical/partitioning.scala | 6 +- .../optimizer/OptimizerExtendableSuite.scala | 14 +- .../org/apache/spark/sql/SQLContext.scala | 14 +- .../spark/sql/execution/CacheManager.scala | 7 +- .../spark/sql/execution/SparkPlan.scala | 4 +- .../sql/execution/WholeStageCodegen.scala | 172 ++++++++-------- .../apache/spark/sql/execution/Window.scala | 36 ++-- .../aggregate/AggregationIterator.scala | 22 +- .../SortBasedAggregationIterator.scala | 6 +- .../aggregate/TungstenAggregate.scala | 16 +- .../datasources/SqlNewHadoopRDD.scala | 8 +- .../datasources/csv/CSVInferSchema.scala | 22 +- .../datasources/csv/DefaultSource.scala | 4 +- .../spark/sql/execution/datasources/ddl.scala | 8 +- .../execution/joins/BroadcastHashJoin.scala | 22 +- .../execution/joins/CartesianProduct.scala | 8 +- .../spark/sql/execution/joins/HashJoin.scala | 8 +- .../sql/execution/joins/HashedRelation.scala | 76 +++---- .../sql/execution/joins/SortMergeJoin.scala | 36 ++-- .../state/HDFSBackedStateStoreProvider.scala | 8 +- .../sql/execution/ui/SparkPlanGraph.scala | 4 +- .../org/apache/spark/sql/functions.scala | 191 +++++++++--------- .../apache/spark/sql/sources/interfaces.scala | 26 +-- .../org/apache/spark/sql/QueryTest.scala | 4 +- .../BenchmarkWholeStageCodegen.scala | 8 +- .../datasources/csv/CSVParserSuite.scala | 4 +- .../spark/streaming/StreamingContext.scala | 7 +- .../api/java/JavaStreamingContext.scala | 7 +- .../streaming/receiver/RateLimiter.scala | 23 ++- .../spark/streaming/scheduler/JobSet.scala | 7 +- .../spark/tools/GenerateMIMAIgnore.scala | 9 +- .../deploy/yarn/YarnSparkHadoopUtil.scala | 16 +- 77 files changed, 786 insertions(+), 747 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 2a8220ff40..ce11772a6d 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -146,16 +146,16 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: /** - * Handle via which a "run" function passed to a [[ComplexFutureAction]] - * can submit jobs for execution. - */ + * Handle via which a "run" function passed to a [[ComplexFutureAction]] + * can submit jobs for execution. + */ @DeveloperApi trait JobSubmitter { /** - * Submit a job for execution and return a FutureAction holding the result. - * This is a wrapper around the same functionality provided by SparkContext - * to enable cancellation. - */ + * Submit a job for execution and return a FutureAction holding the result. + * This is a wrapper around the same functionality provided by SparkContext + * to enable cancellation. + */ def submitJob[T, U, R]( rdd: RDD[T], processPartition: Iterator[T] => U, diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala index 30db6ccbf4..719905a2c9 100644 --- a/core/src/main/scala/org/apache/spark/SSLOptions.scala +++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala @@ -132,34 +132,35 @@ private[spark] case class SSLOptions( private[spark] object SSLOptions extends Logging { - /** Resolves SSLOptions settings from a given Spark configuration object at a given namespace. - * - * The following settings are allowed: - * $ - `[ns].enabled` - `true` or `false`, to enable or disable SSL respectively - * $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory - * $ - `[ns].keyStorePassword` - a password to the key-store file - * $ - `[ns].keyPassword` - a password to the private key - * $ - `[ns].keyStoreType` - the type of the key-store - * $ - `[ns].needClientAuth` - whether SSL needs client authentication - * $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current - * directory - * $ - `[ns].trustStorePassword` - a password to the trust-store file - * $ - `[ns].trustStoreType` - the type of trust-store - * $ - `[ns].protocol` - a protocol name supported by a particular Java version - * $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers - * - * For a list of protocols and ciphers supported by particular Java versions, you may go to - * [[https://blogs.oracle.com/java-platform-group/entry/diagnosing_tls_ssl_and_https Oracle - * blog page]]. - * - * You can optionally specify the default configuration. If you do, for each setting which is - * missing in SparkConf, the corresponding setting is used from the default configuration. - * - * @param conf Spark configuration object where the settings are collected from - * @param ns the namespace name - * @param defaults the default configuration - * @return [[org.apache.spark.SSLOptions]] object - */ + /** + * Resolves SSLOptions settings from a given Spark configuration object at a given namespace. + * + * The following settings are allowed: + * $ - `[ns].enabled` - `true` or `false`, to enable or disable SSL respectively + * $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory + * $ - `[ns].keyStorePassword` - a password to the key-store file + * $ - `[ns].keyPassword` - a password to the private key + * $ - `[ns].keyStoreType` - the type of the key-store + * $ - `[ns].needClientAuth` - whether SSL needs client authentication + * $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current + * directory + * $ - `[ns].trustStorePassword` - a password to the trust-store file + * $ - `[ns].trustStoreType` - the type of trust-store + * $ - `[ns].protocol` - a protocol name supported by a particular Java version + * $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers + * + * For a list of protocols and ciphers supported by particular Java versions, you may go to + * [[https://blogs.oracle.com/java-platform-group/entry/diagnosing_tls_ssl_and_https Oracle + * blog page]]. + * + * You can optionally specify the default configuration. If you do, for each setting which is + * missing in SparkConf, the corresponding setting is used from the default configuration. + * + * @param conf Spark configuration object where the settings are collected from + * @param ns the namespace name + * @param defaults the default configuration + * @return [[org.apache.spark.SSLOptions]] object + */ def parse(conf: SparkConf, ns: String, defaults: Option[SSLOptions] = None): SSLOptions = { val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = defaults.exists(_.enabled)) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d7cb253d69..4b3264cbf5 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -773,9 +773,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli parallelize(seq, numSlices) } - /** Distribute a local Scala collection to form an RDD, with one or more - * location preferences (hostnames of Spark nodes) for each object. - * Create a new partition for each collection item. */ + /** + * Distribute a local Scala collection to form an RDD, with one or more + * location preferences (hostnames of Spark nodes) for each object. + * Create a new partition for each collection item. + */ def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope { assertNotStopped() val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap @@ -1095,14 +1097,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli new NewHadoopRDD(this, fClass, kClass, vClass, jconf) } - /** Get an RDD for a Hadoop SequenceFile with given key and value types. - * - * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each - * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle - * operation will create many references to the same object. - * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first - * copy them using a `map` function. - */ + /** + * Get an RDD for a Hadoop SequenceFile with given key and value types. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle + * operation will create many references to the same object. + * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first + * copy them using a `map` function. + */ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], @@ -1113,14 +1116,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions) } - /** Get an RDD for a Hadoop SequenceFile with given key and value types. - * - * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each - * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle - * operation will create many references to the same object. - * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first - * copy them using a `map` function. - * */ + /** + * Get an RDD for a Hadoop SequenceFile with given key and value types. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle + * operation will create many references to the same object. + * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first + * copy them using a `map` function. + */ def sequenceFile[K, V]( path: String, keyClass: Class[K], diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index e080f91f50..2897272a8b 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -461,10 +461,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) fromRDD(rdd.partitionBy(partitioner)) /** - * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each - * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and - * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. - */ + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. + */ def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] = fromRDD(rdd.join(other, partitioner)) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index d362c40b7a..dfd91ae338 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -295,13 +295,14 @@ class JavaSparkContext(val sc: SparkContext) new JavaRDD(sc.binaryRecords(path, recordLength)) } - /** Get an RDD for a Hadoop SequenceFile with given key and value types. - * - * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each - * record, directly caching the returned RDD will create many references to the same object. - * If you plan to directly cache Hadoop writable objects, you should first copy them using - * a `map` function. - * */ + /** + * Get an RDD for a Hadoop SequenceFile with given key and value types. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. + */ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], @@ -312,13 +313,14 @@ class JavaSparkContext(val sc: SparkContext) new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minPartitions)) } - /** Get an RDD for a Hadoop SequenceFile. - * - * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each - * record, directly caching the returned RDD will create many references to the same object. - * If you plan to directly cache Hadoop writable objects, you should first copy them using - * a `map` function. - */ + /** + * Get an RDD for a Hadoop SequenceFile. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. + */ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): JavaPairRDD[K, V] = { implicit val ctagK: ClassTag[K] = ClassTag(keyClass) @@ -411,13 +413,14 @@ class JavaSparkContext(val sc: SparkContext) new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]]) } - /** Get an RDD for a Hadoop file with an arbitrary InputFormat. - * - * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each - * record, directly caching the returned RDD will create many references to the same object. - * If you plan to directly cache Hadoop writable objects, you should first copy them using - * a `map` function. - */ + /** + * Get an RDD for a Hadoop file with an arbitrary InputFormat. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. + */ def hadoopFile[K, V, F <: InputFormat[K, V]]( path: String, inputFormatClass: Class[F], @@ -431,13 +434,14 @@ class JavaSparkContext(val sc: SparkContext) new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]]) } - /** Get an RDD for a Hadoop file with an arbitrary InputFormat - * - * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each - * record, directly caching the returned RDD will create many references to the same object. - * If you plan to directly cache Hadoop writable objects, you should first copy them using - * a `map` function. - */ + /** + * Get an RDD for a Hadoop file with an arbitrary InputFormat + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. + */ def hadoopFile[K, V, F <: InputFormat[K, V]]( path: String, inputFormatClass: Class[F], diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index a4efafcb27..cba4aaffe2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -29,7 +29,7 @@ import org.apache.spark.launcher.WorkerCommandBuilder import org.apache.spark.util.Utils /** - ** Utilities for running commands with the spark classpath. + * Utilities for running commands with the spark classpath. */ private[deploy] object CommandUtils extends Logging { diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index e5ebc63082..7bc1eb0436 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -29,10 +29,12 @@ import org.apache.spark.serializer.Serializer import org.apache.spark.util.collection.{CompactBuffer, ExternalAppendOnlyMap} import org.apache.spark.util.Utils -/** The references to rdd and splitIndex are transient because redundant information is stored - * in the CoGroupedRDD object. Because CoGroupedRDD is serialized separately from - * CoGroupPartition, if rdd and splitIndex aren't transient, they'll be included twice in the - * task closure. */ +/** + * The references to rdd and splitIndex are transient because redundant information is stored + * in the CoGroupedRDD object. Because CoGroupedRDD is serialized separately from + * CoGroupPartition, if rdd and splitIndex aren't transient, they'll be included twice in the + * task closure. + */ private[spark] case class NarrowCoGroupSplitDep( @transient rdd: RDD[_], @transient splitIndex: Int, diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index f96551c793..4a0a2199ef 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -255,8 +255,8 @@ abstract class RDD[T: ClassTag]( } /** - * Returns the number of partitions of this RDD. - */ + * Returns the number of partitions of this RDD. + */ @Since("1.6.0") final def getNumPartitions: Int = partitions.length diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 90b1813750..50b452c72f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -295,12 +295,12 @@ private[spark] class CoarseMesosSchedulerBackend( } /** - * Launches executors on accepted offers, and declines unused offers. Executors are launched - * round-robin on offers. - * - * @param d SchedulerDriver - * @param offers Mesos offers that match attribute constraints - */ + * Launches executors on accepted offers, and declines unused offers. Executors are launched + * round-robin on offers. + * + * @param d SchedulerDriver + * @param offers Mesos offers that match attribute constraints + */ private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = { val tasks = buildMesosTasks(offers) for (offer <- offers) { @@ -336,12 +336,12 @@ private[spark] class CoarseMesosSchedulerBackend( } /** - * Returns a map from OfferIDs to the tasks to launch on those offers. In order to maximize - * per-task memory and IO, tasks are round-robin assigned to offers. - * - * @param offers Mesos offers that match attribute constraints - * @return A map from OfferID to a list of Mesos tasks to launch on that offer - */ + * Returns a map from OfferIDs to the tasks to launch on those offers. In order to maximize + * per-task memory and IO, tasks are round-robin assigned to offers. + * + * @param offers Mesos offers that match attribute constraints + * @return A map from OfferID to a list of Mesos tasks to launch on that offer + */ private def buildMesosTasks(offers: Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = { // offerID -> tasks val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index c41fa58607..73bd4c58e1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -453,12 +453,12 @@ private[spark] class MesosClusterScheduler( } /** - * Escape args for Unix-like shells, unless already quoted by the user. - * Based on: http://www.gnu.org/software/bash/manual/html_node/Double-Quotes.html - * and http://www.grymoire.com/Unix/Quote.html - * @param value argument - * @return escaped argument - */ + * Escape args for Unix-like shells, unless already quoted by the user. + * Based on: http://www.gnu.org/software/bash/manual/html_node/Double-Quotes.html + * and http://www.grymoire.com/Unix/Quote.html + * @param value argument + * @return escaped argument + */ private[scheduler] def shellEscape(value: String): String = { val WrappedInQuotes = """^(".+"|'.+')$""".r val ShellSpecialChars = (""".*([ '<>&|\?\*;!#\\(\)"$`]).*""").r diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 9a12a61f2f..35f914355d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -148,8 +148,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging { } /** - * Signal that the scheduler has registered with Mesos. - */ + * Signal that the scheduler has registered with Mesos. + */ protected def markRegistered(): Unit = { registerLatch.countDown() } diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala index 76fd249fbd..364fad664e 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala @@ -54,9 +54,9 @@ private[spark] trait ShuffleManager { context: TaskContext): ShuffleReader[K, C] /** - * Remove a shuffle's metadata from the ShuffleManager. - * @return true if the metadata removed successfully, otherwise false. - */ + * Remove a shuffle's metadata from the ShuffleManager. + * @return true if the metadata removed successfully, otherwise false. + */ def unregisterShuffle(shuffleId: Int): Boolean /** diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index df38d11e43..99be4de065 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -455,16 +455,16 @@ private[spark] class MemoryStore( } /** - * Try to evict blocks to free up a given amount of space to store a particular block. - * Can fail if either the block is bigger than our memory or it would require replacing - * another block from the same RDD (which leads to a wasteful cyclic replacement pattern for - * RDDs that don't fit into memory that we want to avoid). - * - * @param blockId the ID of the block we are freeing space for, if any - * @param space the size of this block - * @param memoryMode the type of memory to free (on- or off-heap) - * @return the amount of memory (in bytes) freed by eviction - */ + * Try to evict blocks to free up a given amount of space to store a particular block. + * Can fail if either the block is bigger than our memory or it would require replacing + * another block from the same RDD (which leads to a wasteful cyclic replacement pattern for + * RDDs that don't fit into memory that we want to avoid). + * + * @param blockId the ID of the block we are freeing space for, if any + * @param space the size of this block + * @param memoryMode the type of memory to free (on- or off-heap) + * @return the amount of memory (in bytes) freed by eviction + */ private[spark] def evictBlocksToFreeSpace( blockId: Option[BlockId], space: Long, diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 73768ff4c8..50bcf85805 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -256,10 +256,11 @@ private[spark] object Utils extends Logging { dir } - /** Copy all data from an InputStream to an OutputStream. NIO way of file stream to file stream - * copying is disabled by default unless explicitly set transferToEnabled as true, - * the parameter transferToEnabled should be configured by spark.file.transferTo = [true|false]. - */ + /** + * Copy all data from an InputStream to an OutputStream. NIO way of file stream to file stream + * copying is disabled by default unless explicitly set transferToEnabled as true, + * the parameter transferToEnabled should be configured by spark.file.transferTo = [true|false]. + */ def copyStream(in: InputStream, out: OutputStream, closeStreams: Boolean = false, @@ -1564,9 +1565,11 @@ private[spark] object Utils extends Logging { else -1 } - /** Returns the system properties map that is thread-safe to iterator over. It gets the - * properties which have been set explicitly, as well as those for which only a default value - * has been defined. */ + /** + * Returns the system properties map that is thread-safe to iterator over. It gets the + * properties which have been set explicitly, as well as those for which only a default value + * has been defined. + */ def getSystemProperties: Map[String, String] = { System.getProperties.stringPropertyNames().asScala .map(key => (key, System.getProperty(key))).toMap diff --git a/core/src/test/scala/org/apache/spark/Smuggle.scala b/core/src/test/scala/org/apache/spark/Smuggle.scala index 9f0a1b4c25..9d9217ea1b 100644 --- a/core/src/test/scala/org/apache/spark/Smuggle.scala +++ b/core/src/test/scala/org/apache/spark/Smuggle.scala @@ -24,16 +24,16 @@ import scala.collection.mutable import scala.language.implicitConversions /** - * Utility wrapper to "smuggle" objects into tasks while bypassing serialization. - * This is intended for testing purposes, primarily to make locks, semaphores, and - * other constructs that would not survive serialization available from within tasks. - * A Smuggle reference is itself serializable, but after being serialized and - * deserialized, it still refers to the same underlying "smuggled" object, as long - * as it was deserialized within the same JVM. This can be useful for tests that - * depend on the timing of task completion to be deterministic, since one can "smuggle" - * a lock or semaphore into the task, and then the task can block until the test gives - * the go-ahead to proceed via the lock. - */ + * Utility wrapper to "smuggle" objects into tasks while bypassing serialization. + * This is intended for testing purposes, primarily to make locks, semaphores, and + * other constructs that would not survive serialization available from within tasks. + * A Smuggle reference is itself serializable, but after being serialized and + * deserialized, it still refers to the same underlying "smuggled" object, as long + * as it was deserialized within the same JVM. This can be useful for tests that + * depend on the timing of task completion to be deterministic, since one can "smuggle" + * a lock or semaphore into the task, and then the task can block until the test gives + * the go-ahead to proceed via the lock. + */ class Smuggle[T] private(val key: Symbol) extends Serializable { def smuggledObject: T = Smuggle.get(key) } @@ -41,13 +41,13 @@ class Smuggle[T] private(val key: Symbol) extends Serializable { object Smuggle { /** - * Wraps the specified object to be smuggled into a serialized task without - * being serialized itself. - * - * @param smuggledObject - * @tparam T - * @return Smuggle wrapper around smuggledObject. - */ + * Wraps the specified object to be smuggled into a serialized task without + * being serialized itself. + * + * @param smuggledObject + * @tparam T + * @return Smuggle wrapper around smuggledObject. + */ def apply[T](smuggledObject: T): Smuggle[T] = { val key = Symbol(UUID.randomUUID().toString) lock.writeLock().lock() @@ -72,12 +72,12 @@ object Smuggle { } /** - * Implicit conversion of a Smuggle wrapper to the object being smuggled. - * - * @param smuggle the wrapper to unpack. - * @tparam T - * @return the smuggled object represented by the wrapper. - */ + * Implicit conversion of a Smuggle wrapper to the object being smuggled. + * + * @param smuggle the wrapper to unpack. + * @tparam T + * @return the smuggled object represented by the wrapper. + */ implicit def unpackSmuggledObject[T](smuggle : Smuggle[T]): T = smuggle.smuggledObject } diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index 3d1a0e9795..99d5b496bc 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -78,18 +78,18 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft } /** - * Simulate the part of [[MemoryStore.evictBlocksToFreeSpace]] that releases storage memory. - * - * This is a significant simplification of the real method, which actually drops existing - * blocks based on the size of each block. Instead, here we simply release as many bytes - * as needed to ensure the requested amount of free space. This allows us to set up the - * test without relying on the [[org.apache.spark.storage.BlockManager]], which brings in - * many other dependencies. - * - * Every call to this method will set a global variable, [[evictBlocksToFreeSpaceCalled]], that - * records the number of bytes this is called with. This variable is expected to be cleared - * by the test code later through [[assertEvictBlocksToFreeSpaceCalled]]. - */ + * Simulate the part of [[MemoryStore.evictBlocksToFreeSpace]] that releases storage memory. + * + * This is a significant simplification of the real method, which actually drops existing + * blocks based on the size of each block. Instead, here we simply release as many bytes + * as needed to ensure the requested amount of free space. This allows us to set up the + * test without relying on the [[org.apache.spark.storage.BlockManager]], which brings in + * many other dependencies. + * + * Every call to this method will set a global variable, [[evictBlocksToFreeSpaceCalled]], that + * records the number of bytes this is called with. This variable is expected to be cleared + * by the test code later through [[assertEvictBlocksToFreeSpaceCalled]]. + */ private def evictBlocksToFreeSpaceAnswer(mm: MemoryManager): Answer[Long] = { new Answer[Long] { override def answer(invocation: InvocationOnMock): Long = { diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala index 3da5236745..af5a815f6e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala @@ -21,8 +21,8 @@ package org.apache.spark.examples import org.apache.spark.{SparkConf, SparkContext} /** - * Usage: BroadcastTest [slices] [numElem] [blockSize] - */ + * Usage: BroadcastTest [slices] [numElem] [blockSize] + */ object BroadcastTest { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala index 743fc13db7..7bf023667d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala @@ -25,16 +25,16 @@ import scala.io.Source._ import org.apache.spark.{SparkConf, SparkContext} /** - * Simple test for reading and writing to a distributed - * file system. This example does the following: - * - * 1. Reads local file - * 2. Computes word count on local file - * 3. Writes local file to a DFS - * 4. Reads the file back from the DFS - * 5. Computes word count on the file using Spark - * 6. Compares the word count results - */ + * Simple test for reading and writing to a distributed + * file system. This example does the following: + * + * 1. Reads local file + * 2. Computes word count on local file + * 3. Writes local file to a DFS + * 4. Reads the file back from the DFS + * 5. Computes word count on the file using Spark + * 6. Compares the word count results + */ object DFSReadWriteTest { private var localFilePath: File = new File(".") diff --git a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala index 08b6c717d4..4db229b5de 100644 --- a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala @@ -23,8 +23,8 @@ import java.util.Random import org.apache.spark.{SparkConf, SparkContext} /** - * Usage: GroupByTest [numMappers] [numKVPairs] [KeySize] [numReducers] - */ + * Usage: GroupByTest [numMappers] [numKVPairs] [KeySize] [numReducers] + */ object GroupByTest { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("GroupBy Test") diff --git a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala index 134c3d1d63..3eb0c27723 100644 --- a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala @@ -22,8 +22,8 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD /** - * Usage: MultiBroadcastTest [slices] [numElem] - */ + * Usage: MultiBroadcastTest [slices] [numElem] + */ object MultiBroadcastTest { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala index 7c09664c2f..ec07e6323e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala @@ -23,8 +23,8 @@ import java.util.Random import org.apache.spark.{SparkConf, SparkContext} /** - * Usage: SimpleSkewedGroupByTest [numMappers] [numKVPairs] [valSize] [numReducers] [ratio] - */ + * Usage: SimpleSkewedGroupByTest [numMappers] [numKVPairs] [valSize] [numReducers] [ratio] + */ object SimpleSkewedGroupByTest { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala index d498af9c39..8e4c2b6229 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala @@ -23,8 +23,8 @@ import java.util.Random import org.apache.spark.{SparkConf, SparkContext} /** - * Usage: GroupByTest [numMappers] [numKVPairs] [KeySize] [numReducers] - */ + * Usage: GroupByTest [numMappers] [numKVPairs] [KeySize] [numReducers] + */ object SkewedGroupByTest { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("GroupBy Test") diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala index 50216b9bd4..0ddd065f0d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala @@ -38,17 +38,18 @@ object PageView extends Serializable { } // scalastyle:off -/** Generates streaming events to simulate page views on a website. - * - * This should be used in tandem with PageViewStream.scala. Example: - * - * To run the generator - * `$ bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10` - * To process the generated stream - * `$ bin/run-example \ - * org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444` - * - */ +/** + * Generates streaming events to simulate page views on a website. + * + * This should be used in tandem with PageViewStream.scala. Example: + * + * To run the generator + * `$ bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10` + * To process the generated stream + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444` + * + */ // scalastyle:on object PageViewGenerator { val pages = Map("http://foo.com/" -> .7, diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala index 773a2e5fc2..1ba093f57b 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala @@ -22,16 +22,17 @@ import org.apache.spark.examples.streaming.StreamingExamples import org.apache.spark.streaming.{Seconds, StreamingContext} // scalastyle:off -/** Analyses a streaming dataset of web page views. This class demonstrates several types of - * operators available in Spark streaming. - * - * This should be used in tandem with PageViewStream.scala. Example: - * To run the generator - * `$ bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10` - * To process the generated stream - * `$ bin/run-example \ - * org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444` - */ +/** + * Analyses a streaming dataset of web page views. This class demonstrates several types of + * operators available in Spark streaming. + * + * This should be used in tandem with PageViewStream.scala. Example: + * To run the generator + * `$ bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10` + * To process the generated stream + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444` + */ // scalastyle:on object PageViewStream { def main(args: Array[String]) { diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala index 7dc9606913..6e7c3f358e 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -185,13 +185,14 @@ class FlumeReceiver( override def preferredLocation: Option[String] = Option(host) - /** A Netty Pipeline factory that will decompress incoming data from - * and the Netty client and compress data going back to the client. - * - * The compression on the return is required because Flume requires - * a successful response to indicate it can remove the event/batch - * from the configured channel - */ + /** + * A Netty Pipeline factory that will decompress incoming data from + * and the Netty client and compress data going back to the client. + * + * The compression on the return is required because Flume requires + * a successful response to indicate it can remove the event/batch + * from the configured channel + */ private[streaming] class CompressionChannelPipelineFactory extends ChannelPipelineFactory { def getPipeline(): ChannelPipeline = { diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala index a660d2a00c..02917becf0 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala @@ -19,13 +19,14 @@ package org.apache.spark.streaming.kafka import org.apache.spark.Partition -/** @param topic kafka topic name - * @param partition kafka partition id - * @param fromOffset inclusive starting offset - * @param untilOffset exclusive ending offset - * @param host preferred kafka host, i.e. the leader at the time the rdd was created - * @param port preferred kafka host's port - */ +/** + * @param topic kafka topic name + * @param partition kafka partition id + * @param fromOffset inclusive starting offset + * @param untilOffset exclusive ending offset + * @param host preferred kafka host, i.e. the leader at the time the rdd was created + * @param port preferred kafka host's port + */ private[kafka] class KafkaRDDPartition( val index: Int, diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index a783fe305f..868658dfe5 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -415,11 +415,11 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali } /** - * Compute the connected component membership of each vertex and return a graph with the vertex - * value containing the lowest vertex id in the connected component containing that vertex. - * - * @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]] - */ + * Compute the connected component membership of each vertex and return a graph with the vertex + * value containing the lowest vertex id in the connected component containing that vertex. + * + * @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]] + */ def connectedComponents(): Graph[VertexId, ED] = { ConnectedComponents.run(graph) } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala index 137c512c99..4e9b13162e 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala @@ -60,15 +60,15 @@ object ConnectedComponents { } // end of connectedComponents /** - * Compute the connected component membership of each vertex and return a graph with the vertex - * value containing the lowest vertex id in the connected component containing that vertex. - * - * @tparam VD the vertex attribute type (discarded in the computation) - * @tparam ED the edge attribute type (preserved in the computation) - * @param graph the graph for which to compute the connected components - * @return a graph with vertex attributes containing the smallest vertex in each - * connected component - */ + * Compute the connected component membership of each vertex and return a graph with the vertex + * value containing the lowest vertex id in the connected component containing that vertex. + * + * @tparam VD the vertex attribute type (discarded in the computation) + * @tparam ED the edge attribute type (preserved in the computation) + * @param graph the graph for which to compute the connected components + * @return a graph with vertex attributes containing the smallest vertex in each + * connected component + */ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = { run(graph, Int.MaxValue) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/ElementwiseProduct.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/ElementwiseProduct.scala index 2c7ffdb7ba..1b0a9a12e8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/ElementwiseProduct.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/ElementwiseProduct.scala @@ -38,9 +38,9 @@ class ElementwiseProduct(override val uid: String) def this() = this(Identifiable.randomUID("elemProd")) /** - * the vector to multiply with input vectors - * @group param - */ + * the vector to multiply with input vectors + * @group param + */ val scalingVec: Param[Vector] = new Param(this, "scalingVec", "vector for hadamard product") /** @group setParam */ diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/GaussianMixtureModelWrapper.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/GaussianMixtureModelWrapper.scala index a689b09341..364d5eea08 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/GaussianMixtureModelWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/GaussianMixtureModelWrapper.scala @@ -24,15 +24,15 @@ import org.apache.spark.mllib.clustering.GaussianMixtureModel import org.apache.spark.mllib.linalg.{Vector, Vectors} /** - * Wrapper around GaussianMixtureModel to provide helper methods in Python - */ + * Wrapper around GaussianMixtureModel to provide helper methods in Python + */ private[python] class GaussianMixtureModelWrapper(model: GaussianMixtureModel) { val weights: Vector = Vectors.dense(model.weights) val k: Int = weights.size /** - * Returns gaussians as a List of Vectors and Matrices corresponding each MultivariateGaussian - */ + * Returns gaussians as a List of Vectors and Matrices corresponding each MultivariateGaussian + */ val gaussians: Array[Byte] = { val modelGaussians = model.gaussians.map { gaussian => Array[Any](gaussian.mu, gaussian.sigma) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala index 073f03e16f..05273c3434 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala @@ -27,8 +27,8 @@ import org.apache.spark.mllib.feature.Word2VecModel import org.apache.spark.mllib.linalg.{Vector, Vectors} /** - * Wrapper around Word2VecModel to provide helper methods in Python - */ + * Wrapper around Word2VecModel to provide helper methods in Python + */ private[python] class Word2VecModelWrapper(model: Word2VecModel) { def transform(word: String): Vector = { model.transform(word) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index 6e571fe35a..8c09b69b3c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -123,14 +123,18 @@ sealed trait Matrix extends Serializable { @Since("1.4.0") def toString(maxLines: Int, maxLineWidth: Int): String = toBreeze.toString(maxLines, maxLineWidth) - /** Map the values of this matrix using a function. Generates a new matrix. Performs the - * function on only the backing array. For example, an operation such as addition or - * subtraction will only be performed on the non-zero values in a `SparseMatrix`. */ + /** + * Map the values of this matrix using a function. Generates a new matrix. Performs the + * function on only the backing array. For example, an operation such as addition or + * subtraction will only be performed on the non-zero values in a `SparseMatrix`. + */ private[spark] def map(f: Double => Double): Matrix - /** Update all the values of this matrix using the function f. Performed in-place on the - * backing array. For example, an operation such as addition or subtraction will only be - * performed on the non-zero values in a `SparseMatrix`. */ + /** + * Update all the values of this matrix using the function f. Performed in-place on the + * backing array. For example, an operation such as addition or subtraction will only be + * performed on the non-zero values in a `SparseMatrix`. + */ private[mllib] def update(f: Double => Double): Matrix /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala index e8f4422fd4..84764963b5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala @@ -81,8 +81,8 @@ class StreamingLinearRegressionWithSGD private[mllib] ( } /** - * Set the number of iterations of gradient descent to run per update. Default: 50. - */ + * Set the number of iterations of gradient descent to run per update. Default: 50. + */ @Since("1.1.0") def setNumIterations(numIterations: Int): this.type = { this.algorithm.optimizer.setNumIterations(numIterations) diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 67a616dc15..c5dc6ba221 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -797,9 +797,11 @@ class SparkILoop( // echo("Switched " + (if (old) "off" else "on") + " result printing.") } - /** Run one command submitted by the user. Two values are returned: - * (1) whether to keep running, (2) the line to record for replay, - * if any. */ + /** + * Run one command submitted by the user. Two values are returned: + * (1) whether to keep running, (2) the line to record for replay, + * if any. + */ private[repl] def command(line: String): Result = { if (line startsWith ":") { val cmd = line.tail takeWhile (x => !x.isWhitespace) @@ -841,12 +843,13 @@ class SparkILoop( } import paste.{ ContinueString, PromptString } - /** Interpret expressions starting with the first line. - * Read lines until a complete compilation unit is available - * or until a syntax error has been seen. If a full unit is - * read, go ahead and interpret it. Return the full string - * to be recorded for replay, if any. - */ + /** + * Interpret expressions starting with the first line. + * Read lines until a complete compilation unit is available + * or until a syntax error has been seen. If a full unit is + * read, go ahead and interpret it. Return the full string + * to be recorded for replay, if any. + */ private def interpretStartingWith(code: String): Option[String] = { // signal completion non-completion input has been received in.completion.resetVerbosity() diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkImports.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkImports.scala index 1d0fe10d3d..f22776592c 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkImports.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkImports.scala @@ -118,8 +118,9 @@ private[repl] trait SparkImports { case class ReqAndHandler(req: Request, handler: MemberHandler) { } def reqsToUse: List[ReqAndHandler] = { - /** Loop through a list of MemberHandlers and select which ones to keep. - * 'wanted' is the set of names that need to be imported. + /** + * Loop through a list of MemberHandlers and select which ones to keep. + * 'wanted' is the set of names that need to be imported. */ def select(reqs: List[ReqAndHandler], wanted: Set[Name]): List[ReqAndHandler] = { // Single symbol imports might be implicits! See bug #1752. Rather than diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala index 1f20e26354..e0bfe3c32f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala @@ -140,27 +140,27 @@ object Encoders { def STRING: Encoder[java.lang.String] = ExpressionEncoder() /** - * An encoder for nullable decimal type. - * @since 1.6.0 - */ + * An encoder for nullable decimal type. + * @since 1.6.0 + */ def DECIMAL: Encoder[java.math.BigDecimal] = ExpressionEncoder() /** - * An encoder for nullable date type. - * @since 1.6.0 - */ + * An encoder for nullable date type. + * @since 1.6.0 + */ def DATE: Encoder[java.sql.Date] = ExpressionEncoder() /** - * An encoder for nullable timestamp type. - * @since 1.6.0 - */ + * An encoder for nullable timestamp type. + * @since 1.6.0 + */ def TIMESTAMP: Encoder[java.sql.Timestamp] = ExpressionEncoder() /** - * An encoder for arrays of bytes. - * @since 1.6.1 - */ + * An encoder for arrays of bytes. + * @since 1.6.1 + */ def BINARY: Encoder[Array[Byte]] = ExpressionEncoder() /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 05e2b9a447..a6e317ebf0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -733,9 +733,9 @@ class Analyzer( } /** - * Add the missing attributes into projectList of Project/Window or aggregateExpressions of - * Aggregate. - */ + * Add the missing attributes into projectList of Project/Window or aggregateExpressions of + * Aggregate. + */ private def addMissingAttr(plan: LogicalPlan, missingAttrs: AttributeSet): LogicalPlan = { if (missingAttrs.isEmpty) { return plan @@ -767,9 +767,9 @@ class Analyzer( } /** - * Resolve the expression on a specified logical plan and it's child (recursively), until - * the expression is resolved or meet a non-unary node or Subquery. - */ + * Resolve the expression on a specified logical plan and it's child (recursively), until + * the expression is resolved or meet a non-unary node or Subquery. + */ @tailrec private def resolveExpressionRecursively(expr: Expression, plan: LogicalPlan): Expression = { val resolved = resolveExpression(expr, plan) @@ -1398,8 +1398,8 @@ class Analyzer( } /** - * Check and add order to [[AggregateWindowFunction]]s. - */ + * Check and add order to [[AggregateWindowFunction]]s. + */ object ResolveWindowOrder extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case logical: LogicalPlan => logical transformExpressions { @@ -1489,8 +1489,8 @@ object EliminateSubqueryAliases extends Rule[LogicalPlan] { } /** - * Removes [[Union]] operators from the plan if it just has one child. - */ + * Removes [[Union]] operators from the plan if it just has one child. + */ object EliminateUnions extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case Union(children) if children.size == 1 => children.head diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index 053e612f3e..354311c5e7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -136,9 +136,9 @@ object UnsafeProjection { } /** - * Same as other create()'s but allowing enabling/disabling subexpression elimination. - * TODO: refactor the plumbing and clean this up. - */ + * Same as other create()'s but allowing enabling/disabling subexpression elimination. + * TODO: refactor the plumbing and clean this up. + */ def create( exprs: Seq[Expression], inputSchema: Seq[Attribute], diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index cd490dd676..b64d3eea49 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -58,10 +58,10 @@ class CodegenContext { val references: mutable.ArrayBuffer[Any] = new mutable.ArrayBuffer[Any]() /** - * Add an object to `references`, create a class member to access it. - * - * Returns the name of class member. - */ + * Add an object to `references`, create a class member to access it. + * + * Returns the name of class member. + */ def addReferenceObj(name: String, obj: Any, className: String = null): String = { val term = freshName(name) val idx = references.length @@ -72,9 +72,9 @@ class CodegenContext { } /** - * Holding a list of generated columns as input of current operator, will be used by - * BoundReference to generate code. - */ + * Holding a list of generated columns as input of current operator, will be used by + * BoundReference to generate code. + */ var currentVars: Seq[ExprCode] = null /** @@ -169,14 +169,14 @@ class CodegenContext { final var INPUT_ROW = "i" /** - * The map from a variable name to it's next ID. - */ + * The map from a variable name to it's next ID. + */ private val freshNameIds = new mutable.HashMap[String, Int] freshNameIds += INPUT_ROW -> 1 /** - * A prefix used to generate fresh name. - */ + * A prefix used to generate fresh name. + */ var freshNamePrefix = "" /** @@ -234,8 +234,8 @@ class CodegenContext { } /** - * Update a column in MutableRow from ExprCode. - */ + * Update a column in MutableRow from ExprCode. + */ def updateColumn( row: String, dataType: DataType, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala index 437e417266..3be761c867 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala @@ -22,8 +22,8 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.types._ /** - * A placeholder expression for cube/rollup, which will be replaced by analyzer - */ + * A placeholder expression for cube/rollup, which will be replaced by analyzer + */ trait GroupingSet extends Expression with CodegenFallback { def groupByExprs: Seq[Expression] @@ -43,9 +43,9 @@ case class Cube(groupByExprs: Seq[Expression]) extends GroupingSet {} case class Rollup(groupByExprs: Seq[Expression]) extends GroupingSet {} /** - * Indicates whether a specified column expression in a GROUP BY list is aggregated or not. - * GROUPING returns 1 for aggregated or 0 for not aggregated in the result set. - */ + * Indicates whether a specified column expression in a GROUP BY list is aggregated or not. + * GROUPING returns 1 for aggregated or 0 for not aggregated in the result set. + */ case class Grouping(child: Expression) extends Expression with Unevaluable { override def references: AttributeSet = AttributeSet(VirtualColumn.groupingIdAttribute :: Nil) override def children: Seq[Expression] = child :: Nil @@ -54,10 +54,10 @@ case class Grouping(child: Expression) extends Expression with Unevaluable { } /** - * GroupingID is a function that computes the level of grouping. - * - * If groupByExprs is empty, it means all grouping expressions in GroupingSets. - */ + * GroupingID is a function that computes the level of grouping. + * + * If groupByExprs is empty, it means all grouping expressions in GroupingSets. + */ case class GroupingID(groupByExprs: Seq[Expression]) extends Expression with Unevaluable { override def references: AttributeSet = AttributeSet(VirtualColumn.groupingIdAttribute :: Nil) override def children: Seq[Expression] = groupByExprs diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala index e8a3e129b4..eb8dc1423a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala @@ -467,8 +467,8 @@ object Murmur3HashFunction extends InterpretedHashFunction { } /** - * Print the result of an expression to stderr (used for debugging codegen). - */ + * Print the result of an expression to stderr (used for debugging codegen). + */ case class PrintToStderr(child: Expression) extends UnaryExpression { override def dataType: DataType = child.dataType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index a5ab390c76..69b09bcb35 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -31,9 +31,9 @@ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.types._ /** - * Abstract class all optimizers should inherit of, contains the standard batches (extending - * Optimizers can override this. - */ + * Abstract class all optimizers should inherit of, contains the standard batches (extending + * Optimizers can override this. + */ abstract class Optimizer extends RuleExecutor[LogicalPlan] { def batches: Seq[Batch] = { // Technically some of the rules in Finish Analysis are not optimizer rules and belong more @@ -111,11 +111,11 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { } /** - * Non-abstract representation of the standard Spark optimizing strategies - * - * To ensure extendability, we leave the standard rules in the abstract optimizer rules, while - * specific rules go to the subclasses - */ + * Non-abstract representation of the standard Spark optimizing strategies + * + * To ensure extendability, we leave the standard rules in the abstract optimizer rules, while + * specific rules go to the subclasses + */ object DefaultOptimizer extends Optimizer /** @@ -962,21 +962,21 @@ object PushPredicateThroughAggregate extends Rule[LogicalPlan] with PredicateHel } /** - * Reorder the joins and push all the conditions into join, so that the bottom ones have at least - * one condition. - * - * The order of joins will not be changed if all of them already have at least one condition. - */ + * Reorder the joins and push all the conditions into join, so that the bottom ones have at least + * one condition. + * + * The order of joins will not be changed if all of them already have at least one condition. + */ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { /** - * Join a list of plans together and push down the conditions into them. - * - * The joined plan are picked from left to right, prefer those has at least one join condition. - * - * @param input a list of LogicalPlans to join. - * @param conditions a list of condition for join. - */ + * Join a list of plans together and push down the conditions into them. + * + * The joined plan are picked from left to right, prefer those has at least one join condition. + * + * @param input a list of LogicalPlans to join. + * @param conditions a list of condition for join. + */ @tailrec def createOrderedJoin(input: Seq[LogicalPlan], conditions: Seq[Expression]): LogicalPlan = { assert(input.size >= 2) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index c350f3049f..8541b1f7c6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -1430,8 +1430,8 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { } /** - * Create a [[StructType]] from a sequence of [[StructField]]s. - */ + * Create a [[StructType]] from a sequence of [[StructField]]s. + */ protected def createStructType(ctx: ColTypeListContext): StructType = { StructType(Option(ctx).toSeq.flatMap(visitColTypeList)) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 28d2c445b1..6f35d87ebb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -140,20 +140,20 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { } /** - * A pattern that collects the filter and inner joins. - * - * Filter - * | - * inner Join - * / \ ----> (Seq(plan0, plan1, plan2), conditions) - * Filter plan2 - * | - * inner join - * / \ - * plan0 plan1 - * - * Note: This pattern currently only works for left-deep trees. - */ + * A pattern that collects the filter and inner joins. + * + * Filter + * | + * inner Join + * / \ ----> (Seq(plan0, plan1, plan2), conditions) + * Filter plan2 + * | + * inner join + * / \ + * plan0 plan1 + * + * Note: This pattern currently only works for left-deep trees. + */ object ExtractFiltersAndInnerJoins extends PredicateHelper { // flatten all inner joins, which are next to each other diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 22a4461e66..609a33e2f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -122,8 +122,8 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT AttributeSet(children.flatMap(_.asInstanceOf[QueryPlan[PlanType]].output)) /** - * The set of all attributes that are produced by this node. - */ + * The set of all attributes that are produced by this node. + */ def producedAttributes: AttributeSet = AttributeSet.empty /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index be9f1ffa22..d449088498 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -76,9 +76,9 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution { } /** - * Represents data where tuples are broadcasted to every node. It is quite common that the - * entire set of tuples is transformed into different data structure. - */ + * Represents data where tuples are broadcasted to every node. It is quite common that the + * entire set of tuples is transformed into different data structure. + */ case class BroadcastDistribution(mode: BroadcastMode) extends Distribution /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala index 7e3da6bea7..6e5672ddc3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala @@ -23,21 +23,21 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule /** - * This is a test for SPARK-7727 if the Optimizer is kept being extendable - */ + * This is a test for SPARK-7727 if the Optimizer is kept being extendable + */ class OptimizerExtendableSuite extends SparkFunSuite { /** - * Dummy rule for test batches - */ + * Dummy rule for test batches + */ object DummyRule extends Rule[LogicalPlan] { def apply(p: LogicalPlan): LogicalPlan = p } /** - * This class represents a dummy extended optimizer that takes the batches of the - * Optimizer and adds custom ones. - */ + * This class represents a dummy extended optimizer that takes the batches of the + * Optimizer and adds custom ones. + */ class ExtendedOptimizer extends Optimizer { // rules set to DummyRule, would not be executed anyways diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 221782ee8f..d4290fee0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -712,13 +712,13 @@ class SQLContext private[sql]( } /** - * :: Experimental :: - * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements - * in an range from `start` to `end` (exclusive) with an step value. - * - * @since 2.0.0 - * @group dataset - */ + * :: Experimental :: + * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements + * in an range from `start` to `end` (exclusive) with an step value. + * + * @since 2.0.0 + * @group dataset + */ @Experimental def range(start: Long, end: Long, step: Long): Dataset[java.lang.Long] = { range(start, end, step, numPartitions = sparkContext.defaultParallelism) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index f3478a873a..124ec09efd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -109,9 +109,10 @@ private[sql] class CacheManager extends Logging { cachedData.remove(dataIndex) } - /** Tries to remove the data for the given [[Dataset]] from the cache - * if it's cached - */ + /** + * Tries to remove the data for the given [[Dataset]] from the cache + * if it's cached + */ private[sql] def tryUncacheQuery( query: Dataset[_], blocking: Boolean = true): Boolean = writeLock { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index b1b3d4ac81..ff19d1be1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -84,8 +84,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ private[sql] def metrics: Map[String, SQLMetric[_, _]] = Map.empty /** - * Reset all the metrics. - */ + * Reset all the metrics. + */ private[sql] def resetMetrics(): Unit = { metrics.valuesIterator.foreach(_.reset()) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index 9bdf611f6e..9f539c4929 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -31,8 +31,8 @@ import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics} import org.apache.spark.sql.internal.SQLConf /** - * An interface for those physical operators that support codegen. - */ + * An interface for those physical operators that support codegen. + */ trait CodegenSupport extends SparkPlan { /** Prefix used in the current operator's variable names. */ @@ -46,10 +46,10 @@ trait CodegenSupport extends SparkPlan { } /** - * Creates a metric using the specified name. - * - * @return name of the variable representing the metric - */ + * Creates a metric using the specified name. + * + * @return name of the variable representing the metric + */ def metricTerm(ctx: CodegenContext, name: String): String = { val metric = ctx.addReferenceObj(name, longMetric(name)) val value = ctx.freshName("metricValue") @@ -59,25 +59,25 @@ trait CodegenSupport extends SparkPlan { } /** - * Whether this SparkPlan support whole stage codegen or not. - */ + * Whether this SparkPlan support whole stage codegen or not. + */ def supportCodegen: Boolean = true /** - * Which SparkPlan is calling produce() of this one. It's itself for the first SparkPlan. - */ + * Which SparkPlan is calling produce() of this one. It's itself for the first SparkPlan. + */ protected var parent: CodegenSupport = null /** - * Returns all the RDDs of InternalRow which generates the input rows. - * - * Note: right now we support up to two RDDs. - */ + * Returns all the RDDs of InternalRow which generates the input rows. + * + * Note: right now we support up to two RDDs. + */ def upstreams(): Seq[RDD[InternalRow]] /** - * Returns Java source code to process the rows from upstream. - */ + * Returns Java source code to process the rows from upstream. + */ final def produce(ctx: CodegenContext, parent: CodegenSupport): String = { this.parent = parent ctx.freshNamePrefix = variablePrefix @@ -89,28 +89,28 @@ trait CodegenSupport extends SparkPlan { } /** - * Generate the Java source code to process, should be overridden by subclass to support codegen. - * - * doProduce() usually generate the framework, for example, aggregation could generate this: - * - * if (!initialized) { - * # create a hash map, then build the aggregation hash map - * # call child.produce() - * initialized = true; - * } - * while (hashmap.hasNext()) { - * row = hashmap.next(); - * # build the aggregation results - * # create variables for results - * # call consume(), which will call parent.doConsume() + * Generate the Java source code to process, should be overridden by subclass to support codegen. + * + * doProduce() usually generate the framework, for example, aggregation could generate this: + * + * if (!initialized) { + * # create a hash map, then build the aggregation hash map + * # call child.produce() + * initialized = true; + * } + * while (hashmap.hasNext()) { + * row = hashmap.next(); + * # build the aggregation results + * # create variables for results + * # call consume(), which will call parent.doConsume() * if (shouldStop()) return; - * } - */ + * } + */ protected def doProduce(ctx: CodegenContext): String /** - * Consume the generated columns or row from current SparkPlan, call it's parent's doConsume(). - */ + * Consume the generated columns or row from current SparkPlan, call it's parent's doConsume(). + */ final def consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: String = null): String = { val inputVars = if (row != null) { @@ -158,9 +158,9 @@ trait CodegenSupport extends SparkPlan { } /** - * Returns source code to evaluate all the variables, and clear the code of them, to prevent - * them to be evaluated twice. - */ + * Returns source code to evaluate all the variables, and clear the code of them, to prevent + * them to be evaluated twice. + */ protected def evaluateVariables(variables: Seq[ExprCode]): String = { val evaluate = variables.filter(_.code != "").map(_.code.trim).mkString("\n") variables.foreach(_.code = "") @@ -168,9 +168,9 @@ trait CodegenSupport extends SparkPlan { } /** - * Returns source code to evaluate the variables for required attributes, and clear the code - * of evaluated variables, to prevent them to be evaluated twice.. - */ + * Returns source code to evaluate the variables for required attributes, and clear the code + * of evaluated variables, to prevent them to be evaluated twice.. + */ protected def evaluateRequiredVariables( attributes: Seq[Attribute], variables: Seq[ExprCode], @@ -194,18 +194,18 @@ trait CodegenSupport extends SparkPlan { def usedInputs: AttributeSet = references /** - * Generate the Java source code to process the rows from child SparkPlan. - * - * This should be override by subclass to support codegen. - * - * For example, Filter will generate the code like this: - * - * # code to evaluate the predicate expression, result is isNull1 and value2 - * if (isNull1 || !value2) continue; - * # call consume(), which will call parent.doConsume() - * - * Note: A plan can either consume the rows as UnsafeRow (row), or a list of variables (input). - */ + * Generate the Java source code to process the rows from child SparkPlan. + * + * This should be override by subclass to support codegen. + * + * For example, Filter will generate the code like this: + * + * # code to evaluate the predicate expression, result is isNull1 and value2 + * if (isNull1 || !value2) continue; + * # call consume(), which will call parent.doConsume() + * + * Note: A plan can either consume the rows as UnsafeRow (row), or a list of variables (input). + */ def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { throw new UnsupportedOperationException } @@ -213,11 +213,11 @@ trait CodegenSupport extends SparkPlan { /** - * InputAdapter is used to hide a SparkPlan from a subtree that support codegen. - * - * This is the leaf node of a tree with WholeStageCodegen, is used to generate code that consumes - * an RDD iterator of InternalRow. - */ + * InputAdapter is used to hide a SparkPlan from a subtree that support codegen. + * + * This is the leaf node of a tree with WholeStageCodegen, is used to generate code that consumes + * an RDD iterator of InternalRow. + */ case class InputAdapter(child: SparkPlan) extends UnaryNode with CodegenSupport { override def output: Seq[Attribute] = child.output @@ -260,33 +260,33 @@ object WholeStageCodegen { } /** - * WholeStageCodegen compile a subtree of plans that support codegen together into single Java - * function. - * - * Here is the call graph of to generate Java source (plan A support codegen, but plan B does not): - * - * WholeStageCodegen Plan A FakeInput Plan B - * ========================================================================= - * - * -> execute() - * | - * doExecute() ---------> upstreams() -------> upstreams() ------> execute() - * | - * +-----------------> produce() - * | - * doProduce() -------> produce() - * | - * doProduce() - * | - * doConsume() <--------- consume() - * | - * doConsume() <-------- consume() - * - * SparkPlan A should override doProduce() and doConsume(). - * - * doCodeGen() will create a CodeGenContext, which will hold a list of variables for input, - * used to generated code for BoundReference. - */ + * WholeStageCodegen compile a subtree of plans that support codegen together into single Java + * function. + * + * Here is the call graph of to generate Java source (plan A support codegen, but plan B does not): + * + * WholeStageCodegen Plan A FakeInput Plan B + * ========================================================================= + * + * -> execute() + * | + * doExecute() ---------> upstreams() -------> upstreams() ------> execute() + * | + * +-----------------> produce() + * | + * doProduce() -------> produce() + * | + * doProduce() + * | + * doConsume() <--------- consume() + * | + * doConsume() <-------- consume() + * + * SparkPlan A should override doProduce() and doConsume(). + * + * doCodeGen() will create a CodeGenContext, which will hold a list of variables for input, + * used to generated code for BoundReference. + */ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSupport { override def output: Seq[Attribute] = child.output @@ -422,8 +422,8 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup /** - * Find the chained plans that support codegen, collapse them together as WholeStageCodegen. - */ + * Find the chained plans that support codegen, collapse them together as WholeStageCodegen. + */ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] { private def supportCodegen(e: Expression): Boolean = e match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala index 7d0567842c..806089196c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala @@ -444,8 +444,8 @@ private[execution] final case class RangeBoundOrdering( } /** - * The interface of row buffer for a partition - */ + * The interface of row buffer for a partition + */ private[execution] abstract class RowBuffer { /** Number of rows. */ @@ -462,8 +462,8 @@ private[execution] abstract class RowBuffer { } /** - * A row buffer based on ArrayBuffer (the number of rows is limited) - */ + * A row buffer based on ArrayBuffer (the number of rows is limited) + */ private[execution] class ArrayRowBuffer(buffer: ArrayBuffer[UnsafeRow]) extends RowBuffer { private[this] var cursor: Int = -1 @@ -493,8 +493,8 @@ private[execution] class ArrayRowBuffer(buffer: ArrayBuffer[UnsafeRow]) extends } /** - * An external buffer of rows based on UnsafeExternalSorter - */ + * An external buffer of rows based on UnsafeExternalSorter + */ private[execution] class ExternalRowBuffer(sorter: UnsafeExternalSorter, numFields: Int) extends RowBuffer { @@ -654,12 +654,16 @@ private[execution] final class SlidingWindowFunctionFrame( /** The rows within current sliding window. */ private[this] val buffer = new util.ArrayDeque[InternalRow]() - /** Index of the first input row with a value greater than the upper bound of the current - * output row. */ + /** + * Index of the first input row with a value greater than the upper bound of the current + * output row. + */ private[this] var inputHighIndex = 0 - /** Index of the first input row with a value equal to or greater than the lower bound of the - * current output row. */ + /** + * Index of the first input row with a value equal to or greater than the lower bound of the + * current output row. + */ private[this] var inputLowIndex = 0 /** Prepare the frame for calculating a new partition. Reset all variables. */ @@ -763,8 +767,10 @@ private[execution] final class UnboundedPrecedingWindowFunctionFrame( /** The next row from `input`. */ private[this] var nextRow: InternalRow = null - /** Index of the first input row with a value greater than the upper bound of the current - * output row. */ + /** + * Index of the first input row with a value greater than the upper bound of the current + * output row. + */ private[this] var inputIndex = 0 /** Prepare the frame for calculating a new partition. */ @@ -819,8 +825,10 @@ private[execution] final class UnboundedFollowingWindowFunctionFrame( /** Rows of the partition currently being processed. */ private[this] var input: RowBuffer = null - /** Index of the first input row with a value equal to or greater than the lower bound of the - * current output row. */ + /** + * Index of the first input row with a value equal to or greater than the lower bound of the + * current output row. + */ private[this] var inputIndex = 0 /** Prepare the frame for calculating a new partition. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala index 15627a7004..042c731901 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala @@ -47,17 +47,17 @@ abstract class AggregationIterator( /////////////////////////////////////////////////////////////////////////// /** - * The following combinations of AggregationMode are supported: - * - Partial - * - PartialMerge (for single distinct) - * - Partial and PartialMerge (for single distinct) - * - Final - * - Complete (for SortBasedAggregate with functions that does not support Partial) - * - Final and Complete (currently not used) - * - * TODO: AggregateMode should have only two modes: Update and Merge, AggregateExpression - * could have a flag to tell it's final or not. - */ + * The following combinations of AggregationMode are supported: + * - Partial + * - PartialMerge (for single distinct) + * - Partial and PartialMerge (for single distinct) + * - Final + * - Complete (for SortBasedAggregate with functions that does not support Partial) + * - Final and Complete (currently not used) + * + * TODO: AggregateMode should have only two modes: Update and Merge, AggregateExpression + * could have a flag to tell it's final or not. + */ { val modes = aggregateExpressions.map(_.mode).distinct.toSet require(modes.size <= 2, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala index 8f974980bb..de1491d357 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala @@ -46,9 +46,9 @@ class SortBasedAggregationIterator( newMutableProjection) { /** - * Creates a new aggregation buffer and initializes buffer values - * for all aggregate functions. - */ + * Creates a new aggregation buffer and initializes buffer values + * for all aggregate functions. + */ private def newBuffer: MutableRow = { val bufferSchema = aggregateFunctions.flatMap(_.aggBufferAttributes) val bufferRowSize: Int = bufferSchema.length diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 7c215d1b96..60027edc7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -266,8 +266,8 @@ case class TungstenAggregate( private var sorterTerm: String = _ /** - * This is called by generated Java class, should be public. - */ + * This is called by generated Java class, should be public. + */ def createHashMap(): UnsafeFixedWidthAggregationMap = { // create initialized aggregate buffer val initExpr = declFunctions.flatMap(f => f.initialValues) @@ -286,15 +286,15 @@ case class TungstenAggregate( } /** - * This is called by generated Java class, should be public. - */ + * This is called by generated Java class, should be public. + */ def createUnsafeJoiner(): UnsafeRowJoiner = { GenerateUnsafeRowJoiner.create(groupingKeySchema, bufferSchema) } /** - * Called by generated Java class to finish the aggregate and return a KVIterator. - */ + * Called by generated Java class to finish the aggregate and return a KVIterator. + */ def finishAggregate( hashMap: UnsafeFixedWidthAggregationMap, sorter: UnsafeKVExternalSorter): KVIterator[UnsafeRow, UnsafeRow] = { @@ -372,8 +372,8 @@ case class TungstenAggregate( } /** - * Generate the code for output. - */ + * Generate the code for output. + */ private def generateResultCode( ctx: CodegenContext, keyTerm: String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala index f3514cd14c..159fdc99dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala @@ -168,10 +168,10 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag]( private[this] var reader: RecordReader[Void, V] = null /** - * If the format is ParquetInputFormat, try to create the optimized RecordReader. If this - * fails (for example, unsupported schema), try with the normal reader. - * TODO: plumb this through a different way? - */ + * If the format is ParquetInputFormat, try to create the optimized RecordReader. If this + * fails (for example, unsupported schema), try with the normal reader. + * TODO: plumb this through a different way? + */ if (enableVectorizedParquetReader && format.getClass.getName == "org.apache.parquet.hadoop.ParquetInputFormat") { val parquetReader: VectorizedParquetRecordReader = new VectorizedParquetRecordReader() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala index 797f740dc5..ea843a1013 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala @@ -33,11 +33,11 @@ import org.apache.spark.unsafe.types.UTF8String private[csv] object CSVInferSchema { /** - * Similar to the JSON schema inference - * 1. Infer type of each row - * 2. Merge row types to find common type - * 3. Replace any null types with string type - */ + * Similar to the JSON schema inference + * 1. Infer type of each row + * 2. Merge row types to find common type + * 3. Replace any null types with string type + */ def infer( tokenRdd: RDD[Array[String]], header: Array[String], @@ -75,9 +75,9 @@ private[csv] object CSVInferSchema { } /** - * Infer type of string field. Given known type Double, and a string "1", there is no - * point checking if it is an Int, as the final type must be Double or higher. - */ + * Infer type of string field. Given known type Double, and a string "1", there is no + * point checking if it is an Int, as the final type must be Double or higher. + */ def inferField(typeSoFar: DataType, field: String, nullValue: String = ""): DataType = { if (field == null || field.isEmpty || field == nullValue) { typeSoFar @@ -142,9 +142,9 @@ private[csv] object CSVInferSchema { private val numericPrecedence: IndexedSeq[DataType] = HiveTypeCoercion.numericPrecedence /** - * Copied from internal Spark api - * [[org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion]] - */ + * Copied from internal Spark api + * [[org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion]] + */ val findTightestCommonType: (DataType, DataType) => Option[DataType] = { case (t1, t2) if t1 == t2 => Some(t1) case (NullType, t1) => Some(t1) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala index c0d6f6fbf7..34fcbdf871 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala @@ -38,8 +38,8 @@ import org.apache.spark.util.SerializableConfiguration import org.apache.spark.util.collection.BitSet /** - * Provides access to CSV data from pure SQL statements. - */ + * Provides access to CSV data from pure SQL statements. + */ class DefaultSource extends FileFormat with DataSourceRegister { override def shortName(): String = "csv" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 877e159fbd..2e88d588be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -51,11 +51,11 @@ case class DescribeCommand( } /** - * Used to represent the operation of create table using a data source. + * Used to represent the operation of create table using a data source. * - * @param allowExisting If it is true, we will do nothing when the table already exists. - * If it is false, an exception will be thrown - */ + * @param allowExisting If it is true, we will do nothing when the table already exists. + * If it is false, an exception will be thrown + */ case class CreateTableUsing( tableIdent: TableIdentifier, userSpecifiedSchema: Option[StructType], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala index 0ed1ed41b0..41e566c27b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala @@ -122,8 +122,8 @@ case class BroadcastHashJoin( } /** - * Returns a tuple of Broadcast of HashedRelation and the variable name for it. - */ + * Returns a tuple of Broadcast of HashedRelation and the variable name for it. + */ private def prepareBroadcast(ctx: CodegenContext): (Broadcast[HashedRelation], String) = { // create a name for HashedRelation val broadcastRelation = buildPlan.executeBroadcast[HashedRelation]() @@ -139,9 +139,9 @@ case class BroadcastHashJoin( } /** - * Returns the code for generating join key for stream side, and expression of whether the key - * has any null in it or not. - */ + * Returns the code for generating join key for stream side, and expression of whether the key + * has any null in it or not. + */ private def genStreamSideJoinKey( ctx: CodegenContext, input: Seq[ExprCode]): (ExprCode, String) = { @@ -160,8 +160,8 @@ case class BroadcastHashJoin( } /** - * Generates the code for variable of build side. - */ + * Generates the code for variable of build side. + */ private def genBuildSideVars(ctx: CodegenContext, matched: String): Seq[ExprCode] = { ctx.currentVars = null ctx.INPUT_ROW = matched @@ -188,8 +188,8 @@ case class BroadcastHashJoin( } /** - * Generates the code for Inner join. - */ + * Generates the code for Inner join. + */ private def codegenInner(ctx: CodegenContext, input: Seq[ExprCode]): String = { val (broadcastRelation, relationTerm) = prepareBroadcast(ctx) val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input) @@ -254,8 +254,8 @@ case class BroadcastHashJoin( /** - * Generates the code for left or right outer join. - */ + * Generates the code for left or right outer join. + */ private def codegenOuter(ctx: CodegenContext, input: Seq[ExprCode]): String = { val (broadcastRelation, relationTerm) = prepareBroadcast(ctx) val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala index fb65b50da8..edb4c5a16f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala @@ -28,10 +28,10 @@ import org.apache.spark.util.CompletionIterator import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter /** - * An optimized CartesianRDD for UnsafeRow, which will cache the rows from second child RDD, - * will be much faster than building the right partition for every row in left RDD, it also - * materialize the right RDD (in case of the right RDD is nondeterministic). - */ + * An optimized CartesianRDD for UnsafeRow, which will cache the rows from second child RDD, + * will be much faster than building the right partition for every row in left RDD, it also + * materialize the right RDD (in case of the right RDD is nondeterministic). + */ private[spark] class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numFieldsOfRight: Int) extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 5f42d07273..c298b7dee0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -64,10 +64,10 @@ trait HashJoin { } /** - * Try to rewrite the key as LongType so we can use getLong(), if they key can fit with a long. - * - * If not, returns the original expressions. - */ + * Try to rewrite the key as LongType so we can use getLong(), if they key can fit with a long. + * + * If not, returns the original expressions. + */ def rewriteKeyExpr(keys: Seq[Expression]): Seq[Expression] = { var keyExpr: Expression = null var width = 0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index dc4793e85a..91c470d187 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -38,20 +38,20 @@ import org.apache.spark.util.collection.CompactBuffer */ private[execution] sealed trait HashedRelation { /** - * Returns matched rows. - */ + * Returns matched rows. + */ def get(key: InternalRow): Seq[InternalRow] /** - * Returns matched rows for a key that has only one column with LongType. - */ + * Returns matched rows for a key that has only one column with LongType. + */ def get(key: Long): Seq[InternalRow] = { throw new UnsupportedOperationException } /** - * Returns the size of used memory. - */ + * Returns the size of used memory. + */ def getMemorySize: Long = 1L // to make the test happy /** @@ -77,20 +77,20 @@ private[execution] sealed trait HashedRelation { } /** - * Interface for a hashed relation that have only one row per key. - * - * We should call getValue() for better performance. - */ + * Interface for a hashed relation that have only one row per key. + * + * We should call getValue() for better performance. + */ private[execution] trait UniqueHashedRelation extends HashedRelation { /** - * Returns the matched single row. - */ + * Returns the matched single row. + */ def getValue(key: InternalRow): InternalRow /** - * Returns the matched single row with key that have only one column of LongType. - */ + * Returns the matched single row with key that have only one column of LongType. + */ def getValue(key: Long): InternalRow = { throw new UnsupportedOperationException } @@ -345,8 +345,8 @@ private[joins] object UnsafeHashedRelation { } /** - * An interface for a hashed relation that the key is a Long. - */ + * An interface for a hashed relation that the key is a Long. + */ private[joins] trait LongHashedRelation extends HashedRelation { override def get(key: InternalRow): Seq[InternalRow] = { get(key.getLong(0)) @@ -396,26 +396,26 @@ private[joins] final class UniqueLongHashedRelation( } /** - * A relation that pack all the rows into a byte array, together with offsets and sizes. - * - * All the bytes of UnsafeRow are packed together as `bytes`: - * - * [ Row0 ][ Row1 ][] ... [ RowN ] - * - * With keys: - * - * start start+1 ... start+N - * - * `offsets` are offsets of UnsafeRows in the `bytes` - * `sizes` are the numbers of bytes of UnsafeRows, 0 means no row for this key. - * - * For example, two UnsafeRows (24 bytes and 32 bytes), with keys as 3 and 5 will stored as: - * - * start = 3 - * offsets = [0, 0, 24] - * sizes = [24, 0, 32] - * bytes = [0 - 24][][24 - 56] - */ + * A relation that pack all the rows into a byte array, together with offsets and sizes. + * + * All the bytes of UnsafeRow are packed together as `bytes`: + * + * [ Row0 ][ Row1 ][] ... [ RowN ] + * + * With keys: + * + * start start+1 ... start+N + * + * `offsets` are offsets of UnsafeRows in the `bytes` + * `sizes` are the numbers of bytes of UnsafeRows, 0 means no row for this key. + * + * For example, two UnsafeRows (24 bytes and 32 bytes), with keys as 3 and 5 will stored as: + * + * start = 3 + * offsets = [0, 0, 24] + * sizes = [24, 0, 32] + * bytes = [0 - 24][][24 - 56] + */ private[joins] final class LongArrayRelation( private var numFields: Int, private var start: Long, @@ -483,8 +483,8 @@ private[joins] final class LongArrayRelation( } /** - * Create hashed relation with key that is long. - */ + * Create hashed relation with key that is long. + */ private[joins] object LongHashedRelation { val DENSE_FACTOR = 0.2 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala index 60bd8ea39a..0e7b2f2f31 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala @@ -256,9 +256,9 @@ case class SortMergeJoin( } /** - * Generate a function to scan both left and right to find a match, returns the term for - * matched one row from left side and buffered rows from right side. - */ + * Generate a function to scan both left and right to find a match, returns the term for + * matched one row from left side and buffered rows from right side. + */ private def genScanner(ctx: CodegenContext): (String, String) = { // Create class member for next row from both sides. val leftRow = ctx.freshName("leftRow") @@ -341,12 +341,12 @@ case class SortMergeJoin( } /** - * Creates variables for left part of result row. - * - * In order to defer the access after condition and also only access once in the loop, - * the variables should be declared separately from accessing the columns, we can't use the - * codegen of BoundReference here. - */ + * Creates variables for left part of result row. + * + * In order to defer the access after condition and also only access once in the loop, + * the variables should be declared separately from accessing the columns, we can't use the + * codegen of BoundReference here. + */ private def createLeftVars(ctx: CodegenContext, leftRow: String): Seq[ExprCode] = { ctx.INPUT_ROW = leftRow left.output.zipWithIndex.map { case (a, i) => @@ -370,9 +370,9 @@ case class SortMergeJoin( } /** - * Creates the variables for right part of result row, using BoundReference, since the right - * part are accessed inside the loop. - */ + * Creates the variables for right part of result row, using BoundReference, since the right + * part are accessed inside the loop. + */ private def createRightVar(ctx: CodegenContext, rightRow: String): Seq[ExprCode] = { ctx.INPUT_ROW = rightRow right.output.zipWithIndex.map { case (a, i) => @@ -381,12 +381,12 @@ case class SortMergeJoin( } /** - * Splits variables based on whether it's used by condition or not, returns the code to create - * these variables before the condition and after the condition. - * - * Only a few columns are used by condition, then we can skip the accessing of those columns - * that are not used by condition also filtered out by condition. - */ + * Splits variables based on whether it's used by condition or not, returns the code to create + * these variables before the condition and after the condition. + * + * Only a few columns are used by condition, then we can skip the accessing of those columns + * that are not used by condition also filtered out by condition. + */ private def splitVarsByCondition( attributes: Seq[Attribute], variables: Seq[ExprCode]): (String, String) = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 998eb82de1..8ece3c971a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -468,10 +468,10 @@ private[state] class HDFSBackedStateStoreProvider( } /** - * Clean up old snapshots and delta files that are not needed any more. It ensures that last - * few versions of the store can be recovered from the files, so re-executed RDD operations - * can re-apply updates on the past versions of the store. - */ + * Clean up old snapshots and delta files that are not needed any more. It ensures that last + * few versions of the store can be recovered from the files, so re-executed RDD operations + * can re-apply updates on the past versions of the store. + */ private[state] def cleanup(): Unit = { try { val files = fetchFiles() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index 24a01f5be1..012b125d6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -45,8 +45,8 @@ private[ui] case class SparkPlanGraph( } /** - * All the SparkPlanGraphNodes, including those inside of WholeStageCodegen. - */ + * All the SparkPlanGraphNodes, including those inside of WholeStageCodegen. + */ val allNodes: Seq[SparkPlanGraphNode] = { nodes.flatMap { case cluster: SparkPlanGraphCluster => cluster.nodes :+ cluster diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index baf947d037..da58ba2add 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -332,95 +332,94 @@ object functions { } /** - * Aggregate function: returns the first value in a group. - * - * The function by default returns the first values it sees. It will return the first non-null - * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. - * - * @group agg_funcs - * @since 2.0.0 - */ + * Aggregate function: returns the first value in a group. + * + * The function by default returns the first values it sees. It will return the first non-null + * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. + * + * @group agg_funcs + * @since 2.0.0 + */ def first(e: Column, ignoreNulls: Boolean): Column = withAggregateFunction { new First(e.expr, Literal(ignoreNulls)) } /** - * Aggregate function: returns the first value of a column in a group. - * - * The function by default returns the first values it sees. It will return the first non-null - * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. - * - * @group agg_funcs - * @since 2.0.0 - */ + * Aggregate function: returns the first value of a column in a group. + * + * The function by default returns the first values it sees. It will return the first non-null + * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. + * + * @group agg_funcs + * @since 2.0.0 + */ def first(columnName: String, ignoreNulls: Boolean): Column = { first(Column(columnName), ignoreNulls) } /** - * Aggregate function: returns the first value in a group. - * - * The function by default returns the first values it sees. It will return the first non-null - * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. - * - * @group agg_funcs - * @since 1.3.0 - */ + * Aggregate function: returns the first value in a group. + * + * The function by default returns the first values it sees. It will return the first non-null + * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. + * + * @group agg_funcs + * @since 1.3.0 + */ def first(e: Column): Column = first(e, ignoreNulls = false) /** - * Aggregate function: returns the first value of a column in a group. - * - * The function by default returns the first values it sees. It will return the first non-null - * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. - * - * @group agg_funcs - * @since 1.3.0 - */ + * Aggregate function: returns the first value of a column in a group. + * + * The function by default returns the first values it sees. It will return the first non-null + * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. + * + * @group agg_funcs + * @since 1.3.0 + */ def first(columnName: String): Column = first(Column(columnName)) - /** - * Aggregate function: indicates whether a specified column in a GROUP BY list is aggregated - * or not, returns 1 for aggregated or 0 for not aggregated in the result set. - * - * @group agg_funcs - * @since 2.0.0 - */ + * Aggregate function: indicates whether a specified column in a GROUP BY list is aggregated + * or not, returns 1 for aggregated or 0 for not aggregated in the result set. + * + * @group agg_funcs + * @since 2.0.0 + */ def grouping(e: Column): Column = Column(Grouping(e.expr)) /** - * Aggregate function: indicates whether a specified column in a GROUP BY list is aggregated - * or not, returns 1 for aggregated or 0 for not aggregated in the result set. - * - * @group agg_funcs - * @since 2.0.0 - */ + * Aggregate function: indicates whether a specified column in a GROUP BY list is aggregated + * or not, returns 1 for aggregated or 0 for not aggregated in the result set. + * + * @group agg_funcs + * @since 2.0.0 + */ def grouping(columnName: String): Column = grouping(Column(columnName)) /** - * Aggregate function: returns the level of grouping, equals to - * - * (grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn) - * - * Note: the list of columns should match with grouping columns exactly, or empty (means all the - * grouping columns). - * - * @group agg_funcs - * @since 2.0.0 - */ + * Aggregate function: returns the level of grouping, equals to + * + * (grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn) + * + * Note: the list of columns should match with grouping columns exactly, or empty (means all the + * grouping columns). + * + * @group agg_funcs + * @since 2.0.0 + */ def grouping_id(cols: Column*): Column = Column(GroupingID(cols.map(_.expr))) /** - * Aggregate function: returns the level of grouping, equals to - * - * (grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn) - * - * Note: the list of columns should match with grouping columns exactly. - * - * @group agg_funcs - * @since 2.0.0 - */ + * Aggregate function: returns the level of grouping, equals to + * + * (grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn) + * + * Note: the list of columns should match with grouping columns exactly. + * + * @group agg_funcs + * @since 2.0.0 + */ def grouping_id(colName: String, colNames: String*): Column = { grouping_id((Seq(colName) ++ colNames).map(n => Column(n)) : _*) } @@ -442,51 +441,51 @@ object functions { def kurtosis(columnName: String): Column = kurtosis(Column(columnName)) /** - * Aggregate function: returns the last value in a group. - * - * The function by default returns the last values it sees. It will return the last non-null - * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. - * - * @group agg_funcs - * @since 2.0.0 - */ + * Aggregate function: returns the last value in a group. + * + * The function by default returns the last values it sees. It will return the last non-null + * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. + * + * @group agg_funcs + * @since 2.0.0 + */ def last(e: Column, ignoreNulls: Boolean): Column = withAggregateFunction { new Last(e.expr, Literal(ignoreNulls)) } /** - * Aggregate function: returns the last value of the column in a group. - * - * The function by default returns the last values it sees. It will return the last non-null - * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. - * - * @group agg_funcs - * @since 2.0.0 - */ + * Aggregate function: returns the last value of the column in a group. + * + * The function by default returns the last values it sees. It will return the last non-null + * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. + * + * @group agg_funcs + * @since 2.0.0 + */ def last(columnName: String, ignoreNulls: Boolean): Column = { last(Column(columnName), ignoreNulls) } /** - * Aggregate function: returns the last value in a group. - * - * The function by default returns the last values it sees. It will return the last non-null - * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. - * - * @group agg_funcs - * @since 1.3.0 - */ + * Aggregate function: returns the last value in a group. + * + * The function by default returns the last values it sees. It will return the last non-null + * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. + * + * @group agg_funcs + * @since 1.3.0 + */ def last(e: Column): Column = last(e, ignoreNulls = false) /** - * Aggregate function: returns the last value of the column in a group. - * - * The function by default returns the last values it sees. It will return the last non-null - * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. - * - * @group agg_funcs - * @since 1.3.0 - */ + * Aggregate function: returns the last value of the column in a group. + * + * The function by default returns the last values it sees. It will return the last non-null + * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. + * + * @group agg_funcs + * @since 1.3.0 + */ def last(columnName: String): Column = last(Column(columnName), ignoreNulls = false) /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index e8834d052c..14e14710f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -152,19 +152,19 @@ trait StreamSinkProvider { @DeveloperApi trait CreatableRelationProvider { /** - * Creates a relation with the given parameters based on the contents of the given - * DataFrame. The mode specifies the expected behavior of createRelation when - * data already exists. - * Right now, there are three modes, Append, Overwrite, and ErrorIfExists. - * Append mode means that when saving a DataFrame to a data source, if data already exists, - * contents of the DataFrame are expected to be appended to existing data. - * Overwrite mode means that when saving a DataFrame to a data source, if data already exists, - * existing data is expected to be overwritten by the contents of the DataFrame. - * ErrorIfExists mode means that when saving a DataFrame to a data source, - * if data already exists, an exception is expected to be thrown. - * - * @since 1.3.0 - */ + * Creates a relation with the given parameters based on the contents of the given + * DataFrame. The mode specifies the expected behavior of createRelation when + * data already exists. + * Right now, there are three modes, Append, Overwrite, and ErrorIfExists. + * Append mode means that when saving a DataFrame to a data source, if data already exists, + * contents of the DataFrame are expected to be appended to existing data. + * Overwrite mode means that when saving a DataFrame to a data source, if data already exists, + * existing data is expected to be overwritten by the contents of the DataFrame. + * ErrorIfExists mode means that when saving a DataFrame to a data source, + * if data already exists, an exception is expected to be thrown. + * + * @since 1.3.0 + */ def createRelation( sqlContext: SQLContext, mode: SaveMode, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 854a662cc4..d160f8ab8c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -286,8 +286,8 @@ abstract class QueryTest extends PlanTest { } /** - * Asserts that a given [[Dataset]] does not have missing inputs in all the analyzed plans. - */ + * Asserts that a given [[Dataset]] does not have missing inputs in all the analyzed plans. + */ def assertEmptyMissingInput(query: Dataset[_]): Unit = { assert(query.queryExecution.analyzed.missingInput.isEmpty, s"The analyzed logical plan has missing inputs: ${query.queryExecution.analyzed}") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index 55906793c0..289e1b6db9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -32,10 +32,10 @@ import org.apache.spark.unsafe.map.BytesToBytesMap import org.apache.spark.util.Benchmark /** - * Benchmark to measure whole stage codegen performance. - * To run this: - * build/sbt "sql/test-only *BenchmarkWholeStageCodegen" - */ + * Benchmark to measure whole stage codegen performance. + * To run this: + * build/sbt "sql/test-only *BenchmarkWholeStageCodegen" + */ class BenchmarkWholeStageCodegen extends SparkFunSuite { lazy val conf = new SparkConf().setMaster("local[1]").setAppName("benchmark") .set("spark.sql.shuffle.partitions", "1") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParserSuite.scala index dc54883277..aaeecef5f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParserSuite.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.execution.datasources.csv import org.apache.spark.SparkFunSuite /** - * test cases for StringIteratorReader - */ + * test cases for StringIteratorReader + */ class CSVParserSuite extends SparkFunSuite { private def readAll(iter: Iterator[String]) = { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index c1e151d08b..ac37e8e022 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -497,9 +497,10 @@ class StreamingContext private[streaming] ( new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc)) } - /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for - * receiving system events related to streaming. - */ + /** + * Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for + * receiving system events related to streaming. + */ def addStreamingListener(streamingListener: StreamingListener) { scheduler.listenerBus.addListener(streamingListener) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 05f4da6fac..922e4a5e4d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -517,9 +517,10 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { ssc.remember(duration) } - /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for - * receiving system events related to streaming. - */ + /** + * Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for + * receiving system events related to streaming. + */ def addStreamingListener(streamingListener: StreamingListener) { ssc.addStreamingListener(streamingListener) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index 0a861f22b1..fbac4880bd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -22,17 +22,18 @@ import com.google.common.util.concurrent.{RateLimiter => GuavaRateLimiter} import org.apache.spark.SparkConf import org.apache.spark.internal.Logging -/** Provides waitToPush() method to limit the rate at which receivers consume data. - * - * waitToPush method will block the thread if too many messages have been pushed too quickly, - * and only return when a new message has been pushed. It assumes that only one message is - * pushed at a time. - * - * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages - * per second that each receiver will accept. - * - * @param conf spark configuration - */ +/** + * Provides waitToPush() method to limit the rate at which receivers consume data. + * + * waitToPush method will block the thread if too many messages have been pushed too quickly, + * and only return when a new message has been pushed. It assumes that only one message is + * pushed at a time. + * + * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages + * per second that each receiver will accept. + * + * @param conf spark configuration + */ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { // treated as an upper limit diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index 66d5ffb797..0baedaf275 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -21,9 +21,10 @@ import scala.collection.mutable.HashSet import org.apache.spark.streaming.Time -/** Class representing a set of Jobs - * belong to the same batch. - */ +/** + * Class representing a set of Jobs + * belong to the same batch. + */ private[streaming] case class JobSet( time: Time, diff --git a/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala b/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala index 0df3c501de..c9058ff409 100644 --- a/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala +++ b/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala @@ -91,10 +91,11 @@ object GenerateMIMAIgnore { (ignoredClasses.flatMap(c => Seq(c, c.replace("$", "#"))).toSet, ignoredMembers.toSet) } - /** Scala reflection does not let us see inner function even if they are upgraded - * to public for some reason. So had to resort to java reflection to get all inner - * functions with $$ in there name. - */ + /** + * Scala reflection does not let us see inner function even if they are upgraded + * to public for some reason. So had to resort to java reflection to get all inner + * functions with $$ in there name. + */ def getInnerFunctions(classSymbol: unv.ClassSymbol): Seq[String] = { try { Class.forName(classSymbol.fullName, false, classLoader).getMethods.map(_.getName) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 5af2c29808..4b36da309d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -135,8 +135,8 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { } /** - * Obtains token for the Hive metastore and adds them to the credentials. - */ + * Obtains token for the Hive metastore and adds them to the credentials. + */ def obtainTokenForHiveMetastore( sparkConf: SparkConf, conf: Configuration, @@ -149,8 +149,8 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { } /** - * Obtain a security token for HBase. - */ + * Obtain a security token for HBase. + */ def obtainTokenForHBase( sparkConf: SparkConf, conf: Configuration, @@ -164,10 +164,10 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { } /** - * Return whether delegation tokens should be retrieved for the given service when security is - * enabled. By default, tokens are retrieved, but that behavior can be changed by setting - * a service-specific configuration. - */ + * Return whether delegation tokens should be retrieved for the given service when security is + * enabled. By default, tokens are retrieved, but that behavior can be changed by setting + * a service-specific configuration. + */ private def shouldGetTokens(conf: SparkConf, service: String): Boolean = { conf.getBoolean(s"spark.yarn.security.tokens.${service}.enabled", true) } -- GitLab