diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index 5b42843717e9037390184620c7737a5dcd626d24..f219c5605b643c2ff72e5e4aefda0d759ad228dd 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -86,7 +86,7 @@ public final class UnsafeInMemorySorter { private final PrefixComparators.RadixSortSupport radixSortSupport; /** - * Within this buffer, position {@code 2 * i} holds a pointer pointer to the record at + * Within this buffer, position {@code 2 * i} holds a pointer to the record at * index {@code i}, while position {@code 2 * i + 1} in the array holds an 8-byte key prefix. * * Only part of the array will be used to store the pointers, the rest part is preserved as diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java index 430bf677edbdf58277a08084bdde0c5007318e35..d9f84d10e905170c93480cafdabfffba9d1b8752 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java @@ -25,7 +25,7 @@ import org.apache.spark.util.collection.SortDataFormat; * Supports sorting an array of (record pointer, key prefix) pairs. * Used in {@link UnsafeInMemorySorter}. * <p> - * Within each long[] buffer, position {@code 2 * i} holds a pointer pointer to the record at + * Within each long[] buffer, position {@code 2 * i} holds a pointer to the record at * index {@code i}, while position {@code 2 * i + 1} in the array holds an 8-byte key prefix. */ public final class UnsafeSortDataFormat diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 6f5c31d7ab71c9b248db9a08a40440b808ee618c..4ca442b629fd974806dcf3ef4fbb70d423e3e3d8 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -317,7 +317,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, pool } - // Make sure that that we aren't going to exceed the max RPC message size by making sure + // Make sure that we aren't going to exceed the max RPC message size by making sure // we use broadcast to send large map output statuses. if (minSizeForBroadcast > maxRpcMessageSize) { val msg = s"spark.shuffle.mapOutput.minSizeForBroadcast ($minSizeForBroadcast bytes) must " + diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 7745387dbcebaab72b12f557ee500fb189229e9c..8c1b5f7bf0d9b8283932c78cc0abb6f3ff08a8ac 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -98,7 +98,7 @@ case class FetchFailed( * 4 task failures, instead we immediately go back to the stage which generated the map output, * and regenerate the missing data. (2) we don't count fetch failures for blacklisting, since * presumably its not the fault of the executor where the task ran, but the executor which - * stored the data. This is especially important because we we might rack up a bunch of + * stored the data. This is especially important because we might rack up a bunch of * fetch-failures in rapid succession, on all nodes of the cluster, due to one bad node. */ override def countTowardsTaskFailures: Boolean = false diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index 79f4d06c8460e253f24ca71864dd66281bc64b48..320af5cf97550fd0d765ad827553b895df5e0f7d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -43,7 +43,7 @@ import org.apache.spark.util.{ThreadUtils, Utils} * Execute using * ./bin/spark-class org.apache.spark.deploy.FaultToleranceTest * - * Make sure that that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS + * Make sure that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS * *and* SPARK_JAVA_OPTS: * - spark.deploy.recoveryMode=ZOOKEEPER * - spark.deploy.zookeeper.url=172.17.42.1:2181 diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 2b00a4a6b3e521d2c138d9db3bdf07594c819d15..54f39f7620e5d1dda73b749b5fcf84f89efff8de 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -291,7 +291,7 @@ object HistoryServer extends Logging { /** * Create a security manager. - * This turns off security in the SecurityManager, so that the the History Server can start + * This turns off security in the SecurityManager, so that the History Server can start * in a Spark cluster where security is enabled. * @param config configuration for the SecurityManager constructor * @return the security manager for use in constructing the History Server. diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala index f7a991770d402d62a3abe72be2d989a865176363..8dd1a1ea059beb8a88627b6a7730265756be2115 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala @@ -92,7 +92,7 @@ class ShuffleReadMetrics private[spark] () extends Serializable { private[spark] def setRecordsRead(v: Long): Unit = _recordsRead.setValue(v) /** - * Resets the value of the current metrics (`this`) and and merges all the independent + * Resets the value of the current metrics (`this`) and merges all the independent * [[TempShuffleReadMetrics]] into `this`. */ private[spark] def setMergeValues(metrics: Seq[TempShuffleReadMetrics]): Unit = { diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index dc70eb82d2b542c3461515b5fa9327df6e950dcd..3d4ea3cccc934e18b127d5d0c1c5f5edd808a776 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -37,7 +37,7 @@ import org.apache.spark.storage.{BlockId, StorageLevel} import org.apache.spark.util.Utils /** - * A BlockTransferService that uses Netty to fetch a set of blocks at at time. + * A BlockTransferService that uses Netty to fetch a set of blocks at time. */ private[spark] class NettyBlockTransferService( conf: SparkConf, diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala index 386fdfd218a884b2c862b4a156202a92a4713e8c..3bfdf95db84c6518afc9dd9b9863be1f6e66acb5 100644 --- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala +++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala @@ -350,7 +350,7 @@ object SizeEstimator extends Logging { // 3. consistent fields layouts throughout the hierarchy: This means we should layout // superclass first. And we can use superclass's shellSize as a starting point to layout the // other fields in this class. - // 4. class alignment: HotSpot rounds field blocks up to to HeapOopSize not 4 bytes, confirmed + // 4. class alignment: HotSpot rounds field blocks up to HeapOopSize not 4 bytes, confirmed // with Aleksey. see https://bugs.openjdk.java.net/browse/CODETOOLS-7901322 // // The real world field layout is much more complicated. There are three kinds of fields diff --git a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala index e3304be792af75c040c3169d3ab2bfa1c7d14d99..7998e3702c12213aa33a69a3397af10f3724dc8e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala @@ -253,7 +253,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar assertNotFound(appId, None) } - test("Test that if an attempt ID is is set, it must be used in lookups") { + test("Test that if an attempt ID is set, it must be used in lookups") { val operations = new StubCacheOperations() val clock = new ManualClock(1) implicit val cache = new ApplicationCache(operations, retainedApplications = 10, clock = clock) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 5e8a854e46a0f680a8cea5de2ff6902142c5f6eb..f3d3f701af4626a7367c35d780c0bb389c16a839 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1819,7 +1819,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === HashSet(makeBlockManagerId("hostA"))) - // Reducer should run where RDD 2 has preferences, even though though it also has a shuffle dep + // Reducer should run where RDD 2 has preferences, even though it also has a shuffle dep val reduceTaskSet = taskSets(1) assertLocations(reduceTaskSet, Seq(Seq("hostB"))) complete(reduceTaskSet, Seq((Success, 42))) @@ -2058,7 +2058,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // Now complete tasks in the second task set val newTaskSet = taskSets(1) - assert(newTaskSet.tasks.size === 2) // Both tasks 0 and 1 were on on hostA + assert(newTaskSet.tasks.size === 2) // Both tasks 0 and 1 were on hostA runEvent(makeCompletionEvent(newTaskSet.tasks(0), Success, makeMapStatus("hostB", 2))) assert(results.size === 0) // Map stage job should not be complete yet runEvent(makeCompletionEvent(newTaskSet.tasks(1), Success, makeMapStatus("hostB", 2))) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 7f0838268a11164fc51f7870908d7977ffe52eda..c8b6a3346a460865fd002e1d074850355b1aadef 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -53,7 +53,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { conf } - test("single insert insert") { + test("single insert") { val conf = createSparkConf(loadDefaults = false) sc = new SparkContext("local", "test", conf) val map = createExternalMap[Int] diff --git a/docs/ml-features.md b/docs/ml-features.md index 1d3449746c9be2567895b5c8fc68f4622f8e7219..d67fce3c9528a8cca83a8f6fd3d98df218d20893 100644 --- a/docs/ml-features.md +++ b/docs/ml-features.md @@ -752,7 +752,7 @@ for more details on the API. `Interaction` is a `Transformer` which takes vector or double-valued columns, and generates a single vector column that contains the product of all combinations of one value from each input column. -For example, if you have 2 vector type columns each of which has 3 dimensions as input columns, then then you'll get a 9-dimensional vector as the output column. +For example, if you have 2 vector type columns each of which has 3 dimensions as input columns, then you'll get a 9-dimensional vector as the output column. **Examples** diff --git a/docs/mllib-statistics.md b/docs/mllib-statistics.md index 12797bd8688e1fa100838fb7fc55c60c6cc38332..430c0690457e88ad0fdaa51f78c50bffce094256 100644 --- a/docs/mllib-statistics.md +++ b/docs/mllib-statistics.md @@ -354,7 +354,7 @@ v = u.map(lambda x: 1.0 + 2.0 * x) useful for visualizing empirical probability distributions without requiring assumptions about the particular distribution that the observed samples are drawn from. It computes an estimate of the probability density function of a random variables, evaluated at a given set of points. It achieves -this estimate by expressing the PDF of the empirical distribution at a particular point as the the +this estimate by expressing the PDF of the empirical distribution at a particular point as the mean of PDFs of normal distributions centered around each of the samples. <div class="codetabs"> diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 2458bb5ffa2981894ab2bc9e6ca8dcef3cc6741b..9b82e8e7449a100f920730a6e6d4964b18501dcc 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -244,7 +244,7 @@ Note that the following Kafka params cannot be set and the Kafka source will thr - **group.id**: Kafka source will create a unique group id for each query automatically. - **auto.offset.reset**: Set the source option `startingOffsets` to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather - than rely on the kafka Consumer to do it. This will ensure that no data is missed when when new + than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that `startingOffsets` only applies when a new Streaming query is started, and that resuming will always pick up from where the query left off. - **key.deserializer**: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 799f636505b3eea43d8bca1971d2885e90a7be1d..6cd050e4f2a00fb7a929c25940dd00b45e3a26d8 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -680,7 +680,7 @@ windowedCounts = words.groupBy( ### Handling Late Data and Watermarking Now consider what happens if one of the events arrives late to the application. -For example, say, a word generated at 12:04 (i.e. event time) could be received received by +For example, say, a word generated at 12:04 (i.e. event time) could be received by the application at 12:11. The application should use the time 12:04 instead of 12:11 to update the older counts for the window `12:00 - 12:10`. This occurs naturally in our window-based grouping – Structured Streaming can maintain the intermediate state diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java index 90412442790792a065b86bd3ad69005409645358..0e5d00565b71af523a8591f42e5f3d32c04d091e 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java @@ -52,7 +52,7 @@ public class JavaLDAExample { double ll = model.logLikelihood(dataset); double lp = model.logPerplexity(dataset); System.out.println("The lower bound on the log likelihood of the entire corpus: " + ll); - System.out.println("The upper bound bound on perplexity: " + lp); + System.out.println("The upper bound on perplexity: " + lp); // Describe topics. Dataset<Row> topics = model.describeTopics(3); diff --git a/examples/src/main/python/ml/lda_example.py b/examples/src/main/python/ml/lda_example.py index 2dc1742ff7a0b54bd03df9ea5cf3fddefaaef32a..a8b346f72cd6fae9a4f1dc7436bdfe3701f17a26 100644 --- a/examples/src/main/python/ml/lda_example.py +++ b/examples/src/main/python/ml/lda_example.py @@ -46,7 +46,7 @@ if __name__ == "__main__": ll = model.logLikelihood(dataset) lp = model.logPerplexity(dataset) print("The lower bound on the log likelihood of the entire corpus: " + str(ll)) - print("The upper bound bound on perplexity: " + str(lp)) + print("The upper bound on perplexity: " + str(lp)) # Describe topics. topics = model.describeTopics(3) diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/LDAExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/LDAExample.scala index 22b3b0e3ad9c1b91cf609a2e91cce8868426ffb7..4215d37cb59d51d478c3491fd2f7469fd91e4c7d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/LDAExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/LDAExample.scala @@ -50,7 +50,7 @@ object LDAExample { val ll = model.logLikelihood(dataset) val lp = model.logPerplexity(dataset) println(s"The lower bound on the log likelihood of the entire corpus: $ll") - println(s"The upper bound bound on perplexity: $lp") + println(s"The upper bound on perplexity: $lp") // Describe topics. val topics = model.describeTopics(3) diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala index 41f27e937662f5f6bd9d49fc44e12fb0af3d83a2..e5b63aa1a77ef69a89102ae47a646e2c48d13d49 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala @@ -45,7 +45,7 @@ import org.apache.flume.sink.AbstractSink * the thread itself is blocked and a reference to it saved off. * * When the ack for that batch is received, - * the thread which created the transaction is is retrieved and it commits the transaction with the + * the thread which created the transaction is retrieved and it commits the transaction with the * channel from the same thread it was originally created in (since Flume transactions are * thread local). If a nack is received instead, the sink rolls back the transaction. If no ack * is received within the specified timeout, the transaction is rolled back too. If an ack comes diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index aa01238f91247e173521c1c9a47b4cddc953279a..ff9965b854c6355a3a282b62cf6dd39a65f5e3e7 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -212,7 +212,7 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider |Instead set the source option '$STARTING_OFFSETS_OPTION_KEY' to 'earliest' or 'latest' |to specify where to start. Structured Streaming manages which offsets are consumed |internally, rather than relying on the kafkaConsumer to do it. This will ensure that no - |data is missed when when new topics/partitions are dynamically subscribed. Note that + |data is missed when new topics/partitions are dynamically subscribed. Note that |'$STARTING_OFFSETS_OPTION_KEY' only applies when a new Streaming query is started, and |that resuming will always pick up from where the query left off. See the docs for more |details. diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala index a4d81a680979ec7a778bdf84db16e2b8e5d03256..18a5a1509a33aeafb83a608256b644bfed434ab4 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala @@ -129,7 +129,7 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean) /** * Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager - * and the rest to a write ahead log, and then reading reading it all back using the RDD. + * and the rest to a write ahead log, and then reading it all back using the RDD. * It can also test if the partitions that were read from the log were again stored in * block manager. * diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index 583e5e0928eba8c5666e279f2b51adba0a8a80c4..728a883b1ac21eae87458d8b31e6d79bd5cb1a36 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -512,7 +512,7 @@ abstract class LDAModel private[ml] ( } /** - * Calculate an upper bound bound on perplexity. (Lower is better.) + * Calculate an upper bound on perplexity. (Lower is better.) * See Equation (16) in the Online LDA paper (Hoffman et al., 2010). * * WARNING: If this model is an instance of [[DistributedLDAModel]] (produced when [[optimizer]] diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala index 5cbfbff3e4a62b8f92a9326f0eab6f7b1a1530f8..4d6520d0b2ee04f17c6607e4767b78d11d708639 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala @@ -54,7 +54,7 @@ private[python] class Word2VecModelWrapper(model: Word2VecModel) { } /** - * Finds words similar to the the vector representation of a word without + * Finds words similar to the vector representation of a word without * filtering results. * @param vector a vector * @param num number of synonyms to find diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index 25ffd8561fe37b0a664ba558540e988420ce03b7..933a5f1d52ed9246d45b3229a4d160d40557ea3a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -245,7 +245,7 @@ class LocalLDAModel private[spark] ( } /** - * Calculate an upper bound bound on perplexity. (Lower is better.) + * Calculate an upper bound on perplexity. (Lower is better.) * See Equation (16) in original Online LDA paper. * * @param documents test corpus to use for calculating perplexity diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala index 46deb545af3f08fa6830121d27ddf1cd6c6c2af4..f44c8fe35145983e33a82c12670a05fc08c4005a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala @@ -29,7 +29,7 @@ import org.apache.spark.streaming.dstream.DStream /** * :: DeveloperApi :: * StreamingLinearAlgorithm implements methods for continuously - * training a generalized linear model model on streaming data, + * training a generalized linear model on streaming data, * and using it for prediction on (possibly different) streaming data. * * This class takes as type parameters a GeneralizedLinearModel, diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index 35d0aefa04a8e55216deecf6d126e7435c2074ca..54510e0beae7cddf3f172d763e7520c5af334be1 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -699,7 +699,7 @@ class LDAModel(JavaModel): @since("2.0.0") def logPerplexity(self, dataset): """ - Calculate an upper bound bound on perplexity. (Lower is better.) + Calculate an upper bound on perplexity. (Lower is better.) See Equation (16) in the Online LDA paper (Hoffman et al., 2010). WARNING: If this model is an instance of :py:class:`DistributedLDAModel` (produced when diff --git a/python/pyspark/ml/linalg/__init__.py b/python/pyspark/ml/linalg/__init__.py index 1705c156ce4c81a51dd964ee3415183f73726249..b76534325196508538ad08f6f9fb7f2e0ebd773b 100644 --- a/python/pyspark/ml/linalg/__init__.py +++ b/python/pyspark/ml/linalg/__init__.py @@ -481,7 +481,7 @@ class SparseVector(Vector): >>> SparseVector(4, {1:1.0, 6:2.0}) Traceback (most recent call last): ... - AssertionError: Index 6 is out of the the size of vector with size=4 + AssertionError: Index 6 is out of the size of vector with size=4 >>> SparseVector(4, {-1:1.0}) Traceback (most recent call last): ... @@ -521,7 +521,7 @@ class SparseVector(Vector): if self.indices.size > 0: assert np.max(self.indices) < self.size, \ - "Index %d is out of the the size of vector with size=%d" \ + "Index %d is out of the size of vector with size=%d" \ % (np.max(self.indices), self.size) assert np.min(self.indices) >= 0, \ "Contains negative index %d" % (np.min(self.indices)) diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py index 2a85ec01bc92a6a810586097acc6b85b851b9594..7bc6a59ad3b26ecbb189874505500254b309793f 100644 --- a/python/pyspark/sql/utils.py +++ b/python/pyspark/sql/utils.py @@ -95,7 +95,7 @@ def install_exception_handler(): original = py4j.protocol.get_return_value # The original `get_return_value` is not patched, it's idempotent. patched = capture_sql_exception(original) - # only patch the one used in in py4j.java_gateway (call Java API) + # only patch the one used in py4j.java_gateway (call Java API) py4j.java_gateway.get_return_value = patched diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala index 8772e26f4314d8e3a6c9fca8d0fc16d5c094cf34..fb2d61f621c8adeca9b596a65891d4f3af5ebf61 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala @@ -32,7 +32,7 @@ private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], rack /** * This strategy is calculating the optimal locality preferences of YARN containers by considering - * the node ratio of pending tasks, number of required cores/containers and and locality of current + * the node ratio of pending tasks, number of required cores/containers and locality of current * existing and pending allocated containers. The target of this algorithm is to maximize the number * of tasks that would run locally. * diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index d205547698c5ba4deef90ea8473126a542d7686c..86de90984ca00045c4e9ef1502fb7b9f85615e89 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -196,7 +196,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo assertIndexIsValid(i); BitSetMethods.set(baseObject, baseOffset, i); // To preserve row equality, zero out the value when setting the column to null. - // Since this row does does not currently support updates to variable-length values, we don't + // Since this row does not currently support updates to variable-length values, we don't // have to worry about zeroing out that data. Platform.putLong(baseObject, getFieldOffset(i), 0); } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala index 13115f4728950a06f90929f38b89bb12771719b6..07d294b1085487c403167700d218ae2fee22610c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala @@ -516,7 +516,7 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction { * into the number of buckets); both variables are based on the size of the current partition. * During the calculation process the function keeps track of the current row number, the current * bucket number, and the row number at which the bucket will change (bucketThreshold). When the - * current row number reaches bucket threshold, the bucket value is increased by one and the the + * current row number reaches bucket threshold, the bucket value is increased by one and the * threshold is increased by the bucket size (plus one extra if the current bucket is padded). * * This documentation has been based upon similar documentation for the Hive and Presto projects. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index d583fa31b0b28e1eff7adb4139df683948fd8149..c977e788b01061a4a0cdd40e200c9592faf9c503 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -795,7 +795,7 @@ case object OneRowRelation extends LeafNode { /** * Computes [[Statistics]] for this plan. The default implementation assumes the output - * cardinality is the product of of all child plan's cardinality, i.e. applies in the case + * cardinality is the product of all child plan's cardinality, i.e. applies in the case * of cartesian joins. * * [[LeafNode]]s must override this. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 235ca8d2633a1bb312f76f9a5bd9dadf96029c71..a96a3b7af29125632229c73060aa27b4ac02181b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -142,7 +142,7 @@ object DateTimeUtils { } /** - * Returns the number of days since epoch from from java.sql.Date. + * Returns the number of days since epoch from java.sql.Date. */ def fromJavaDate(date: Date): SQLDate = { millisToDays(date.getTime) @@ -503,7 +503,7 @@ object DateTimeUtils { } /** - * Calculates the year and and the number of the day in the year for the given + * Calculates the year and the number of the day in the year for the given * number of days. The given days is the number of days since 1.1.1970. * * The calculation uses the fact that the period 1.1.2001 until 31.12.2400 is diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeSetSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeSetSuite.scala index 97cfb5f06dd73867bb674228f7f4a886c9a988df..273f95f91ee50888dd98665cc161690e943d28a7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeSetSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeSetSuite.scala @@ -52,7 +52,7 @@ class AttributeSetSuite extends SparkFunSuite { assert((aSet ++ bSet).contains(aLower) === true) } - test("extracts all references references") { + test("extracts all references ") { val addSet = AttributeSet(Add(aUpper, Alias(bUpper, "test")()):: Nil) assert(addSet.contains(aUpper)) assert(addSet.contains(aLower)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index c1cedd8541a83e72c41653f7a918516252dd107c..2a06f3c47c3706e9d682d2fb7749f918ee94fa9d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -361,7 +361,7 @@ class Dataset[T] private[sql]( * method used to map columns depend on the type of `U`: * - When `U` is a class, fields for the class will be mapped to columns of the same name * (case sensitivity is determined by `spark.sql.caseSensitive`). - * - When `U` is a tuple, the columns will be be mapped by ordinal (i.e. the first column will + * - When `U` is a tuple, the columns will be mapped by ordinal (i.e. the first column will * be assigned to `_1`). * - When `U` is a primitive type (i.e. String, Int, etc), then the first column of the * `DataFrame` will be used. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index bc290702dc37ff5c6f74b6d3e4f5693bb80ce5a6..bad59961ace125b8820d9f32379dc59d9902169f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -41,7 +41,7 @@ object PartitionPath { } /** - * Holds a directory in a partitioned collection of files as well as as the partition values + * Holds a directory in a partitioned collection of files as well as the partition values * in the form of a Row. Before scanning, the files at `path` need to be enumerated. */ case class PartitionPath(values: InternalRow, path: Path) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 0ce47b152c59b1283916a721efa62b1d6ec2f80d..0b39965b31d0aaf30155b97ae4b7a4ffdee5ce24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -285,7 +285,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { /** * Starts the execution of the streaming query, which will continually send results to the given - * `ForeachWriter` as as new data arrives. The `ForeachWriter` can be used to send the data + * `ForeachWriter` as new data arrives. The `ForeachWriter` can be used to send the data * generated by the `DataFrame`/`Dataset` to an external system. * * Scala example: diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala index 4296ec543e27590f2d86179ce9ddb8d170a52ead..22d5c47a6fb51cb4e6c1ab569f22a319291d6d43 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala @@ -257,7 +257,7 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B } } - test("time window in SQL with with two expressions") { + test("time window in SQL with two expressions") { withTempTable { table => checkAnswer( spark.sql( @@ -272,7 +272,7 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B } } - test("time window in SQL with with three expressions") { + test("time window in SQL with three expressions") { withTempTable { table => checkAnswer( spark.sql( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 375da224aaa7fe7ac67c54a1b29d521ef4da30b5..0bfc92fdb621878b5baeaa0669b75b03c6c5941b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -363,7 +363,7 @@ class PlannerSuite extends SharedSQLContext { // This is a regression test for SPARK-9703 test("EnsureRequirements should not repartition if only ordering requirement is unsatisfied") { // Consider an operator that imposes both output distribution and ordering requirements on its - // children, such as sort sort merge join. If the distribution requirements are satisfied but + // children, such as sort merge join. If the distribution requirements are satisfied but // the output ordering requirements are unsatisfied, then the planner should only add sorts and // should not need to add additional shuffles / exchanges. val outputOrdering = Seq(SortOrder(Literal(1), Ascending)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 22f59f63d6f79ac896be8a84a675a80a821e804e..f67444fbc49d6a3200090a6bcf834d5fb29603c9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -144,7 +144,7 @@ class FileStreamSinkSuite extends StreamTest { } // This tests whether FileStreamSink works with aggregations. Specifically, it tests - // whether the the correct streaming QueryExecution (i.e. IncrementalExecution) is used to + // whether the correct streaming QueryExecution (i.e. IncrementalExecution) is used to // to execute the trigger for writing data to file sink. See SPARK-18440 for more details. test("writing with aggregation") { diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/AbstractService.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/AbstractService.java index c2a2b2d478af74eaca99ffaaaeec9c2cc4c497ab..9dd0efc03968dcff6a20bf31b78b5a6e5867e708 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/AbstractService.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/AbstractService.java @@ -151,7 +151,7 @@ public abstract class AbstractService implements Service { } /** - * Verify that that a service is in a given state. + * Verify that a service is in a given state. * * @param currentState * the desired state diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/ServiceOperations.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/ServiceOperations.java index 8946219d85ccd3835e4dfc60cca4c8815a74afc9..a2c580d6acc71a8a81450bdde41b07222997552a 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/ServiceOperations.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/ServiceOperations.java @@ -33,7 +33,7 @@ public final class ServiceOperations { } /** - * Verify that that a service is in a given state. + * Verify that a service is in a given state. * @param state the actual state a service is in * @param expectedState the desired state * @throws IllegalStateException if the service state is different from diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/ServiceStateChangeListener.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/ServiceStateChangeListener.java index d1aadad04cf67cb8d6ea5f5bc3d254ec61dd5b37..a1ff10dc2bc9fdba02057e40abb067959b174348 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/ServiceStateChangeListener.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/ServiceStateChangeListener.java @@ -29,7 +29,7 @@ public interface ServiceStateChangeListener { * have changed state before this callback is invoked. * * This operation is invoked on the thread that initiated the state change, - * while the service itself in in a synchronized section. + * while the service itself in a synchronized section. * <ol> * <li>Any long-lived operation here will prevent the service state * change from completing in a timely manner.</li> diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/TypeDescriptor.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/TypeDescriptor.java index 562b3f5e6786591c3122990617ec7a34da3579bb..b80fd67884addd6d93f4455d5eb5e23f32addbe7 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/TypeDescriptor.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/TypeDescriptor.java @@ -98,7 +98,7 @@ public class TypeDescriptor { * For datetime types this is the length in characters of the String representation * (assuming the maximum allowed precision of the fractional seconds component). * For binary data this is the length in bytes. - * Null is returned for for data types where the column size is not applicable. + * Null is returned for data types where the column size is not applicable. */ public Integer getColumnSize() { if (type.isNumericType()) { diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 5cd4935e225ee571c70424f9475ee00a53c944df..d217e9b4feb6d2ba84f6f67e8e0a46c5968d1f16 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -178,7 +178,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "skewjoin", "database", - // These tests fail and and exit the JVM. + // These tests fail and exit the JVM. "auto_join18_multi_distinct", "join18_multi_distinct", "input44", diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index c80695bd3e0feb95d54232e7fdd9feaa8fe2fb70..7ee5fc543c55b2865192fe2173d06a666e5821b7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -40,7 +40,7 @@ import org.apache.spark.util.Utils * The Hive table scan operator. Column and partition pruning are both handled. * * @param requestedAttributes Attributes to be fetched from the Hive table. - * @param relation The Hive table be be scanned. + * @param relation The Hive table be scanned. * @param partitionPruningPred An optional partition pruning predicate for partitioned table. */ private[hive] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/State.scala b/streaming/src/main/scala/org/apache/spark/streaming/State.scala index 3f560f889f055cb089a268303381a26fe37cdac4..23cf48eb0673839a38fe22ed2ffbeadbdce65a64 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/State.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/State.scala @@ -178,7 +178,7 @@ private[streaming] class StateImpl[S] extends State[S] { removed } - /** Whether the state has been been updated */ + /** Whether the state has been updated */ def isUpdated(): Boolean = { updated } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index a3c125c306954a672e8400877fa7946f0573de60..9a760e2947d0be473ae97977cb44fd9b5cdf24c3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -88,7 +88,7 @@ abstract class InputDStream[T: ClassTag](_ssc: StreamingContext) if (!super.isTimeValid(time)) { false // Time not valid } else { - // Time is valid, but check it it is more than lastValidTime + // Time is valid, but check it is more than lastValidTime if (lastValidTime != null && time < lastValidTime) { logWarning(s"isTimeValid called with $time whereas the last valid time " + s"is $lastValidTime") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala index e8c814ba7184bf6ce4e541b4b62fbc443121bd26..9b6bc71c7a5b58d690b616078a450bd49f06af3a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala @@ -326,7 +326,7 @@ class MapWithStateRDDSuite extends SparkFunSuite with RDDCheckpointTester with B // Create a MapWithStateRDD that has a long lineage using the data RDD with a long lineage val stateRDDWithLongLineage = makeStateRDDWithLongLineageDataRDD(longLineageRDD) - // Create a new MapWithStateRDD, with the lineage lineage MapWithStateRDD as the parent + // Create a new MapWithStateRDD, with the lineage MapWithStateRDD as the parent new MapWithStateRDD[Int, Int, Int, Int]( stateRDDWithLongLineage, stateRDDWithLongLineage.sparkContext.emptyRDD[(Int, Int)].partitionBy(partitioner), diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala index a37fac87300b738cd8139102b2943e12f86495b6..c5e695a33ae10fed67d7fb2993398b78a6becfee 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala @@ -108,7 +108,7 @@ class WriteAheadLogBackedBlockRDDSuite /** * Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager - * and the rest to a write ahead log, and then reading reading it all back using the RDD. + * and the rest to a write ahead log, and then reading it all back using the RDD. * It can also test if the partitions that were read from the log were again stored in * block manager. * diff --git a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala index a1d0561bf308a9f80c3abadff0f2bb1242c0a660..b70383ecde4d85bffe62b793715c8994ecb221c6 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala @@ -90,7 +90,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter { listener.pushedData.asScala.toSeq should contain theSameElementsInOrderAs (data1) assert(listener.onAddDataCalled === false) // should be called only with addDataWithCallback() - // Verify addDataWithCallback() add data+metadata and and callbacks are called correctly + // Verify addDataWithCallback() add data+metadata and callbacks are called correctly val data2 = 11 to 20 val metadata2 = data2.map { _.toString } data2.zip(metadata2).foreach { case (d, m) => blockGenerator.addDataWithCallback(d, m) } @@ -103,7 +103,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter { listener.pushedData.asScala.toSeq should contain theSameElementsInOrderAs combined } - // Verify addMultipleDataWithCallback() add data+metadata and and callbacks are called correctly + // Verify addMultipleDataWithCallback() add data+metadata and callbacks are called correctly val data3 = 21 to 30 val metadata3 = "metadata" blockGenerator.addMultipleDataWithCallback(data3.iterator, metadata3)