From 930b70f0523e96fe01c1317ef7fad1b76b36d4d9 Mon Sep 17 00:00:00 2001 From: Sandeep <sandeep@techaddict.me> Date: Thu, 10 Apr 2014 15:04:13 -0700 Subject: [PATCH] Remove Unnecessary Whitespace's stack these together in a commit else they show up chunk by chunk in different commits. Author: Sandeep <sandeep@techaddict.me> Closes #380 from techaddict/white_space and squashes the following commits: b58f294 [Sandeep] Remove Unnecessary Whitespace's --- .../org/apache/spark/bagel/BagelSuite.scala | 4 +- .../api/java/function/FlatMapFunction.java | 2 +- .../api/java/function/FlatMapFunction2.java | 2 +- .../org/apache/spark/HttpFileServer.scala | 14 +- .../scala/org/apache/spark/HttpServer.scala | 6 +- .../scala/org/apache/spark/Partition.scala | 2 +- .../org/apache/spark/SecurityManager.scala | 88 +++---- .../org/apache/spark/SparkException.scala | 2 +- .../org/apache/spark/SparkHadoopWriter.scala | 20 +- .../org/apache/spark/SparkSaslClient.scala | 10 +- .../org/apache/spark/SparkSaslServer.scala | 6 +- .../scala/org/apache/spark/TestUtils.scala | 2 +- .../spark/broadcast/TorrentBroadcast.scala | 2 +- .../apache/spark/deploy/ClientArguments.scala | 2 +- .../spark/deploy/worker/WorkerArguments.scala | 8 +- .../spark/deploy/worker/ui/IndexPage.scala | 2 +- .../CoarseGrainedExecutorBackend.scala | 2 +- .../spark/executor/ExecutorExitCode.scala | 8 +- .../executor/ExecutorURLClassLoader.scala | 2 +- .../apache/spark/metrics/sink/CsvSink.scala | 2 +- .../org/apache/spark/network/Connection.scala | 8 +- .../apache/spark/network/ConnectionId.scala | 6 +- .../spark/network/ConnectionManager.scala | 28 +-- .../spark/network/ConnectionManagerTest.scala | 24 +- .../apache/spark/network/ReceiverTest.scala | 2 +- .../spark/network/SecurityMessage.scala | 48 ++-- .../spark/network/netty/FileHeader.scala | 4 +- .../apache/spark/partial/PartialResult.scala | 4 +- .../apache/spark/rdd/DoubleRDDFunctions.scala | 8 +- .../spark/rdd/PartitionerAwareUnionRDD.scala | 2 +- .../spark/scheduler/DAGSchedulerEvent.scala | 2 +- .../spark/scheduler/LiveListenerBus.scala | 214 +++++++++--------- .../spark/storage/BlockFetcherIterator.scala | 2 +- .../apache/spark/storage/BlockManager.scala | 4 +- .../apache/spark/storage/BlockMessage.scala | 22 +- .../spark/storage/BlockMessageArray.scala | 26 +-- .../org/apache/spark/ui/JettyUtils.scala | 4 +- .../scala/org/apache/spark/ui/UIUtils.scala | 2 +- .../apache/spark/util/ClosureCleaner.scala | 22 +- .../org/apache/spark/util/JsonProtocol.scala | 2 +- .../org/apache/spark/util/NextIterator.scala | 4 +- .../org/apache/spark/util/StatCounter.scala | 4 +- .../scala/org/apache/spark/util/Vector.scala | 2 +- .../spark/util/random/XORShiftRandom.scala | 16 +- .../org/apache/spark/AkkaUtilsSuite.scala | 20 +- .../scala/org/apache/spark/DriverSuite.scala | 2 +- .../org/apache/spark/FileServerSuite.scala | 4 +- .../scala/org/apache/spark/FileSuite.scala | 2 +- .../deploy/worker/WorkerWatcherSuite.scala | 2 +- .../WholeTextFileRecordReaderSuite.scala | 2 +- .../rdd/ParallelCollectionSplitSuite.scala | 26 +-- .../spark/scheduler/SparkListenerSuite.scala | 4 +- .../scheduler/TaskSchedulerImplSuite.scala | 2 +- .../scala/org/apache/spark/ui/UISuite.scala | 4 +- .../spark/util/ClosureCleanerSuite.scala | 2 +- .../apache/spark/util/NextIteratorSuite.scala | 4 +- .../util/random/XORShiftRandomSuite.scala | 20 +- .../streaming/mqtt/MQTTInputDStream.scala | 16 +- .../twitter/TwitterInputDStream.scala | 6 +- .../org/apache/spark/graphx/GraphOps.scala | 2 +- .../apache/spark/graphx/GraphOpsSuite.scala | 2 +- .../spark/mllib/optimization/Optimizer.scala | 2 +- .../GeneralizedLinearAlgorithm.scala | 4 +- .../spark/repl/ExecutorClassLoader.scala | 4 +- .../org/apache/spark/repl/SparkImports.scala | 2 +- .../spark/sql/catalyst/expressions/Cast.scala | 6 +- .../sql/catalyst/expressions/Expression.scala | 12 +- .../expressions/stringOperations.scala | 28 +-- .../spark/sql/catalyst/types/dataTypes.scala | 4 +- .../ExpressionEvaluationSuite.scala | 10 +- .../sql/ScalaReflectionRelationSuite.scala | 2 +- .../apache/spark/streaming/Checkpoint.scala | 14 +- .../org/apache/spark/streaming/Interval.scala | 8 +- .../org/apache/spark/streaming/Time.scala | 4 +- .../dstream/DStreamCheckpointData.scala | 2 +- .../streaming/dstream/FileInputDStream.scala | 2 +- .../streaming/dstream/QueueInputDStream.scala | 8 +- .../streaming/receivers/ActorReceiver.scala | 2 +- .../apache/spark/streaming/util/Clock.scala | 26 +-- .../spark/streaming/util/RawTextHelper.scala | 18 +- .../spark/streaming/util/RecurringTimer.scala | 8 +- .../apache/spark/streaming/JavaAPISuite.java | 2 +- 82 files changed, 467 insertions(+), 467 deletions(-) diff --git a/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala b/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala index 9c37fadb78..69144e3e65 100644 --- a/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala +++ b/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala @@ -28,9 +28,9 @@ class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializ class TestMessage(val targetId: String) extends Message[String] with Serializable class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeouts { - + var sc: SparkContext = _ - + after { if (sc != null) { sc.stop() diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java index fa75842047..23f5fdd436 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java @@ -24,4 +24,4 @@ import java.io.Serializable; */ public interface FlatMapFunction<T, R> extends Serializable { public Iterable<R> call(T t) throws Exception; -} \ No newline at end of file +} diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java index d1fdec0724..c48e92f535 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java +++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java @@ -24,4 +24,4 @@ import java.io.Serializable; */ public interface FlatMapFunction2<T1, T2, R> extends Serializable { public Iterable<R> call(T1 t1, T2 t2) throws Exception; -} \ No newline at end of file +} diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala index 3d7692ea8a..a6e300d345 100644 --- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala @@ -24,13 +24,13 @@ import com.google.common.io.Files import org.apache.spark.util.Utils private[spark] class HttpFileServer(securityManager: SecurityManager) extends Logging { - + var baseDir : File = null var fileDir : File = null var jarDir : File = null var httpServer : HttpServer = null var serverUri : String = null - + def initialize() { baseDir = Utils.createTempDir() fileDir = new File(baseDir, "files") @@ -43,24 +43,24 @@ private[spark] class HttpFileServer(securityManager: SecurityManager) extends Lo serverUri = httpServer.uri logDebug("HTTP file server started at: " + serverUri) } - + def stop() { httpServer.stop() } - + def addFile(file: File) : String = { addFileToDir(file, fileDir) serverUri + "/files/" + file.getName } - + def addJar(file: File) : String = { addFileToDir(file, jarDir) serverUri + "/jars/" + file.getName } - + def addFileToDir(file: File, dir: File) : String = { Files.copy(file, new File(dir, file.getName)) dir + "/" + file.getName } - + } diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index cb5df25fa4..7e9b517f90 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -83,19 +83,19 @@ private[spark] class HttpServer(resourceBase: File, securityManager: SecurityMan } } - /** + /** * Setup Jetty to the HashLoginService using a single user with our * shared secret. Configure it to use DIGEST-MD5 authentication so that the password * isn't passed in plaintext. */ private def setupSecurityHandler(securityMgr: SecurityManager): ConstraintSecurityHandler = { val constraint = new Constraint() - // use DIGEST-MD5 as the authentication mechanism + // use DIGEST-MD5 as the authentication mechanism constraint.setName(Constraint.__DIGEST_AUTH) constraint.setRoles(Array("user")) constraint.setAuthenticate(true) constraint.setDataConstraint(Constraint.DC_NONE) - + val cm = new ConstraintMapping() cm.setConstraint(constraint) cm.setPathSpec("/*") diff --git a/core/src/main/scala/org/apache/spark/Partition.scala b/core/src/main/scala/org/apache/spark/Partition.scala index 87914a061f..27892dbd2a 100644 --- a/core/src/main/scala/org/apache/spark/Partition.scala +++ b/core/src/main/scala/org/apache/spark/Partition.scala @@ -25,7 +25,7 @@ trait Partition extends Serializable { * Get the split's index within its parent RDD */ def index: Int - + // A better default implementation of HashCode override def hashCode(): Int = index } diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 2237ee3bb7..b52f2d4f41 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -25,93 +25,93 @@ import org.apache.hadoop.io.Text import org.apache.spark.deploy.SparkHadoopUtil -/** - * Spark class responsible for security. - * +/** + * Spark class responsible for security. + * * In general this class should be instantiated by the SparkEnv and most components - * should access it from that. There are some cases where the SparkEnv hasn't been + * should access it from that. There are some cases where the SparkEnv hasn't been * initialized yet and this class must be instantiated directly. - * + * * Spark currently supports authentication via a shared secret. * Authentication can be configured to be on via the 'spark.authenticate' configuration - * parameter. This parameter controls whether the Spark communication protocols do + * parameter. This parameter controls whether the Spark communication protocols do * authentication using the shared secret. This authentication is a basic handshake to * make sure both sides have the same shared secret and are allowed to communicate. - * If the shared secret is not identical they will not be allowed to communicate. - * - * The Spark UI can also be secured by using javax servlet filters. A user may want to - * secure the UI if it has data that other users should not be allowed to see. The javax - * servlet filter specified by the user can authenticate the user and then once the user - * is logged in, Spark can compare that user versus the view acls to make sure they are - * authorized to view the UI. The configs 'spark.ui.acls.enable' and 'spark.ui.view.acls' + * If the shared secret is not identical they will not be allowed to communicate. + * + * The Spark UI can also be secured by using javax servlet filters. A user may want to + * secure the UI if it has data that other users should not be allowed to see. The javax + * servlet filter specified by the user can authenticate the user and then once the user + * is logged in, Spark can compare that user versus the view acls to make sure they are + * authorized to view the UI. The configs 'spark.ui.acls.enable' and 'spark.ui.view.acls' * control the behavior of the acls. Note that the person who started the application * always has view access to the UI. * * Spark does not currently support encryption after authentication. - * + * * At this point spark has multiple communication protocols that need to be secured and * different underlying mechanisms are used depending on the protocol: * - * - Akka -> The only option here is to use the Akka Remote secure-cookie functionality. - * Akka remoting allows you to specify a secure cookie that will be exchanged - * and ensured to be identical in the connection handshake between the client - * and the server. If they are not identical then the client will be refused - * to connect to the server. There is no control of the underlying - * authentication mechanism so its not clear if the password is passed in + * - Akka -> The only option here is to use the Akka Remote secure-cookie functionality. + * Akka remoting allows you to specify a secure cookie that will be exchanged + * and ensured to be identical in the connection handshake between the client + * and the server. If they are not identical then the client will be refused + * to connect to the server. There is no control of the underlying + * authentication mechanism so its not clear if the password is passed in * plaintext or uses DIGEST-MD5 or some other mechanism. * Akka also has an option to turn on SSL, this option is not currently supported * but we could add a configuration option in the future. - * - * - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty - * for the HttpServer. Jetty supports multiple authentication mechanisms - - * Basic, Digest, Form, Spengo, etc. It also supports multiple different login + * + * - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty + * for the HttpServer. Jetty supports multiple authentication mechanisms - + * Basic, Digest, Form, Spengo, etc. It also supports multiple different login * services - Hash, JAAS, Spnego, JDBC, etc. Spark currently uses the HashLoginService - * to authenticate using DIGEST-MD5 via a single user and the shared secret. + * to authenticate using DIGEST-MD5 via a single user and the shared secret. * Since we are using DIGEST-MD5, the shared secret is not passed on the wire * in plaintext. * We currently do not support SSL (https), but Jetty can be configured to use it * so we could add a configuration option for this in the future. - * + * * The Spark HttpServer installs the HashLoginServer and configures it to DIGEST-MD5. - * Any clients must specify the user and password. There is a default + * Any clients must specify the user and password. There is a default * Authenticator installed in the SecurityManager to how it does the authentication * and in this case gets the user name and password from the request. * - * - ConnectionManager -> The Spark ConnectionManager uses java nio to asynchronously - * exchange messages. For this we use the Java SASL - * (Simple Authentication and Security Layer) API and again use DIGEST-MD5 + * - ConnectionManager -> The Spark ConnectionManager uses java nio to asynchronously + * exchange messages. For this we use the Java SASL + * (Simple Authentication and Security Layer) API and again use DIGEST-MD5 * as the authentication mechanism. This means the shared secret is not passed * over the wire in plaintext. * Note that SASL is pluggable as to what mechanism it uses. We currently use * DIGEST-MD5 but this could be changed to use Kerberos or other in the future. * Spark currently supports "auth" for the quality of protection, which means * the connection is not supporting integrity or privacy protection (encryption) - * after authentication. SASL also supports "auth-int" and "auth-conf" which + * after authentication. SASL also supports "auth-int" and "auth-conf" which * SPARK could be support in the future to allow the user to specify the quality - * of protection they want. If we support those, the messages will also have to + * of protection they want. If we support those, the messages will also have to * be wrapped and unwrapped via the SaslServer/SaslClient.wrap/unwrap API's. - * - * Since the connectionManager does asynchronous messages passing, the SASL + * + * Since the connectionManager does asynchronous messages passing, the SASL * authentication is a bit more complex. A ConnectionManager can be both a client * and a Server, so for a particular connection is has to determine what to do. - * A ConnectionId was added to be able to track connections and is used to + * A ConnectionId was added to be able to track connections and is used to * match up incoming messages with connections waiting for authentication. * If its acting as a client and trying to send a message to another ConnectionManager, * it blocks the thread calling sendMessage until the SASL negotiation has occurred. * The ConnectionManager tracks all the sendingConnections using the ConnectionId * and waits for the response from the server and does the handshake. * - * - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters + * - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters * can be used. Yarn requires a specific AmIpFilter be installed for security to work * properly. For non-Yarn deployments, users can write a filter to go through a * companies normal login service. If an authentication filter is in place then the * SparkUI can be configured to check the logged in user against the list of users who * have view acls to see if that user is authorized. - * The filters can also be used for many different purposes. For instance filters + * The filters can also be used for many different purposes. For instance filters * could be used for logging, encryption, or compression. - * + * * The exact mechanisms used to generate/distributed the shared secret is deployment specific. - * + * * For Yarn deployments, the secret is automatically generated using the Akka remote * Crypt.generateSecureCookie() API. The secret is placed in the Hadoop UGI which gets passed * around via the Hadoop RPC mechanism. Hadoop RPC can be configured to support different levels @@ -121,7 +121,7 @@ import org.apache.spark.deploy.SparkHadoopUtil * to reduce the possibility of web based attacks through YARN. Hadoop can be configured to use * filters to do authentication. That authentication then happens via the ResourceManager Proxy * and Spark will use that to do authorization against the view acls. - * + * * For other Spark deployments, the shared secret must be specified via the * spark.authenticate.secret config. * All the nodes (Master and Workers) and the applications need to have the same shared secret. @@ -152,7 +152,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging { " are ui acls enabled: " + uiAclsOn + " users with view permissions: " + viewAcls.toString()) // Set our own authenticator to properly negotiate user/password for HTTP connections. - // This is needed by the HTTP client fetching from the HttpServer. Put here so its + // This is needed by the HTTP client fetching from the HttpServer. Put here so its // only set once. if (authOn) { Authenticator.setDefault( @@ -214,12 +214,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging { def uiAclsEnabled(): Boolean = uiAclsOn /** - * Checks the given user against the view acl list to see if they have + * Checks the given user against the view acl list to see if they have * authorization to view the UI. If the UI acls must are disabled * via spark.ui.acls.enable, all users have view access. - * + * * @param user to see if is authorized - * @return true is the user has permission, otherwise false + * @return true is the user has permission, otherwise false */ def checkUIViewPermissions(user: String): Boolean = { if (uiAclsEnabled() && (user != null) && (!viewAcls.contains(user))) false else true diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala index d34e47e8ca..4351ed74b6 100644 --- a/core/src/main/scala/org/apache/spark/SparkException.scala +++ b/core/src/main/scala/org/apache/spark/SparkException.scala @@ -20,5 +20,5 @@ package org.apache.spark class SparkException(message: String, cause: Throwable) extends Exception(message, cause) { - def this(message: String) = this(message, null) + def this(message: String) = this(message, null) } diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index b92ea01a87..f6703986bd 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -42,7 +42,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf) private val now = new Date() private val conf = new SerializableWritable(jobConf) - + private var jobID = 0 private var splitID = 0 private var attemptID = 0 @@ -58,8 +58,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf) def preSetup() { setIDs(0, 0, 0) HadoopRDD.addLocalConfiguration("", 0, 0, 0, conf.value) - - val jCtxt = getJobContext() + + val jCtxt = getJobContext() getOutputCommitter().setupJob(jCtxt) } @@ -74,7 +74,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf) val numfmt = NumberFormat.getInstance() numfmt.setMinimumIntegerDigits(5) numfmt.setGroupingUsed(false) - + val outputName = "part-" + numfmt.format(splitID) val path = FileOutputFormat.getOutputPath(conf.value) val fs: FileSystem = { @@ -85,7 +85,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf) } } - getOutputCommitter().setupTask(getTaskContext()) + getOutputCommitter().setupTask(getTaskContext()) writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL) } @@ -103,18 +103,18 @@ class SparkHadoopWriter(@transient jobConf: JobConf) def commit() { val taCtxt = getTaskContext() - val cmtr = getOutputCommitter() + val cmtr = getOutputCommitter() if (cmtr.needsTaskCommit(taCtxt)) { try { cmtr.commitTask(taCtxt) logInfo (taID + ": Committed") } catch { - case e: IOException => { + case e: IOException => { logError("Error committing the output of task: " + taID.value, e) cmtr.abortTask(taCtxt) throw e } - } + } } else { logWarning ("No need to commit output of task: " + taID.value) } @@ -144,7 +144,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf) } private def getJobContext(): JobContext = { - if (jobContext == null) { + if (jobContext == null) { jobContext = newJobContext(conf.value, jID.value) } jobContext @@ -175,7 +175,7 @@ object SparkHadoopWriter { val jobtrackerID = formatter.format(time) new JobID(jobtrackerID, id) } - + def createPathFromString(path: String, conf: JobConf): Path = { if (path == null) { throw new IllegalArgumentException("Output path is null") diff --git a/core/src/main/scala/org/apache/spark/SparkSaslClient.scala b/core/src/main/scala/org/apache/spark/SparkSaslClient.scala index a2a871cbd3..5b14c4291d 100644 --- a/core/src/main/scala/org/apache/spark/SparkSaslClient.scala +++ b/core/src/main/scala/org/apache/spark/SparkSaslClient.scala @@ -44,12 +44,12 @@ private[spark] class SparkSaslClient(securityMgr: SecurityManager) extends Logg * configurable in the future. */ private var saslClient: SaslClient = Sasl.createSaslClient(Array[String](SparkSaslServer.DIGEST), - null, null, SparkSaslServer.SASL_DEFAULT_REALM, SparkSaslServer.SASL_PROPS, + null, null, SparkSaslServer.SASL_DEFAULT_REALM, SparkSaslServer.SASL_PROPS, new SparkSaslClientCallbackHandler(securityMgr)) /** * Used to initiate SASL handshake with server. - * @return response to challenge if needed + * @return response to challenge if needed */ def firstToken(): Array[Byte] = { synchronized { @@ -86,7 +86,7 @@ private[spark] class SparkSaslClient(securityMgr: SecurityManager) extends Logg } /** - * Disposes of any system resources or security-sensitive information the + * Disposes of any system resources or security-sensitive information the * SaslClient might be using. */ def dispose() { @@ -110,7 +110,7 @@ private[spark] class SparkSaslClient(securityMgr: SecurityManager) extends Logg private class SparkSaslClientCallbackHandler(securityMgr: SecurityManager) extends CallbackHandler { - private val userName: String = + private val userName: String = SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes()) private val secretKey = securityMgr.getSecretKey() private val userPassword: Array[Char] = @@ -138,7 +138,7 @@ private[spark] class SparkSaslClient(securityMgr: SecurityManager) extends Logg rc.setText(rc.getDefaultText()) } case cb: RealmChoiceCallback => {} - case cb: Callback => throw + case cb: Callback => throw new UnsupportedCallbackException(cb, "handle: Unrecognized SASL client callback") } } diff --git a/core/src/main/scala/org/apache/spark/SparkSaslServer.scala b/core/src/main/scala/org/apache/spark/SparkSaslServer.scala index 11fcb2ae3a..6161a6fb7a 100644 --- a/core/src/main/scala/org/apache/spark/SparkSaslServer.scala +++ b/core/src/main/scala/org/apache/spark/SparkSaslServer.scala @@ -64,7 +64,7 @@ private[spark] class SparkSaslServer(securityMgr: SecurityManager) extends Loggi } /** - * Disposes of any system resources or security-sensitive information the + * Disposes of any system resources or security-sensitive information the * SaslServer might be using. */ def dispose() { @@ -88,7 +88,7 @@ private[spark] class SparkSaslServer(securityMgr: SecurityManager) extends Loggi private class SparkSaslDigestCallbackHandler(securityMgr: SecurityManager) extends CallbackHandler { - private val userName: String = + private val userName: String = SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes()) override def handle(callbacks: Array[Callback]) { @@ -123,7 +123,7 @@ private[spark] class SparkSaslServer(securityMgr: SecurityManager) extends Loggi ac.setAuthorizedID(authzid) } } - case cb: Callback => throw + case cb: Callback => throw new UnsupportedCallbackException(cb, "handle: Unrecognized SASL DIGEST-MD5 Callback") } } diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 4597595a83..f3f59e47c3 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -31,7 +31,7 @@ import com.google.common.io.Files * projects. * * TODO: See if we can move this to the test codebase by specifying - * test dependencies between projects. + * test dependencies between projects. */ private[spark] object TestUtils { diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 2b32546c68..2659274c5e 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -158,7 +158,7 @@ private[spark] class TorrentBroadcast[T](@transient var value_ : T, isLocal: Boo } def receiveBroadcast(): Boolean = { - // Receive meta-info about the size of broadcast data, + // Receive meta-info about the size of broadcast data, // the number of chunks it is divided into, etc. val metaId = BroadcastBlockId(id, "meta") var attemptId = 10 diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala index c07838f798..5da9615c9e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala @@ -43,7 +43,7 @@ private[spark] class ClientArguments(args: Array[String]) { // kill parameters var driverId: String = "" - + parse(args.toList) def parse(args: List[String]): Unit = args match { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala index d35d5be73f..3836bf219e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala @@ -32,8 +32,8 @@ private[spark] class WorkerArguments(args: Array[String]) { var memory = inferDefaultMemory() var masters: Array[String] = null var workDir: String = null - - // Check for settings in environment variables + + // Check for settings in environment variables if (System.getenv("SPARK_WORKER_PORT") != null) { port = System.getenv("SPARK_WORKER_PORT").toInt } @@ -49,7 +49,7 @@ private[spark] class WorkerArguments(args: Array[String]) { if (System.getenv("SPARK_WORKER_DIR") != null) { workDir = System.getenv("SPARK_WORKER_DIR") } - + parse(args.toList) def parse(args: List[String]): Unit = args match { @@ -78,7 +78,7 @@ private[spark] class WorkerArguments(args: Array[String]) { case ("--work-dir" | "-d") :: value :: tail => workDir = value parse(tail) - + case "--webui-port" :: IntParam(value) :: tail => webUiPort = value parse(tail) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala index 85200ab0e1..49c1009cac 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala @@ -137,7 +137,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) { .format(executor.appId, executor.execId)}>stdout</a> <a href={"logPage?appId=%s&executorId=%s&logType=stderr" .format(executor.appId, executor.execId)}>stderr</a> - </td> + </td> </tr> } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 16887d8892..6327ac0166 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -53,7 +53,7 @@ private[spark] class CoarseGrainedExecutorBackend( case RegisteredExecutor(sparkProperties) => logInfo("Successfully registered with driver") // Make this host instead of hostPort ? - executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties, + executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties, false) case RegisterExecutorFailed(message) => diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala index ceff3a067d..38be2c58b3 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala @@ -34,7 +34,7 @@ object ExecutorExitCode { logging the exception. */ val UNCAUGHT_EXCEPTION_TWICE = 51 - /** The default uncaught exception handler was reached, and the uncaught exception was an + /** The default uncaught exception handler was reached, and the uncaught exception was an OutOfMemoryError. */ val OOM = 52 @@ -43,10 +43,10 @@ object ExecutorExitCode { /** TachyonStore failed to initialize after many attempts. */ val TACHYON_STORE_FAILED_TO_INITIALIZE = 54 - + /** TachyonStore failed to create a local temporary directory after many attempts. */ val TACHYON_STORE_FAILED_TO_CREATE_DIR = 55 - + def explainExitCode(exitCode: Int): String = { exitCode match { case UNCAUGHT_EXCEPTION => "Uncaught exception" @@ -57,7 +57,7 @@ object ExecutorExitCode { case TACHYON_STORE_FAILED_TO_INITIALIZE => "TachyonStore failed to initialize." case TACHYON_STORE_FAILED_TO_CREATE_DIR => "TachyonStore failed to create a local temporary directory." - case _ => + case _ => "Unknown executor exit code (" + exitCode + ")" + ( if (exitCode > 128) { " (died from signal " + (exitCode - 128) + "?)" diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala index 208e77073f..218ed7b5d2 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala @@ -38,7 +38,7 @@ private[spark] class ChildExecutorURLClassLoader(urls: Array[URL], parent: Class override def addURL(url: URL) { super.addURL(url) } - override def findClass(name: String): Class[_] = { + override def findClass(name: String): Class[_] = { super.findClass(name) } } diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala index 42c1200926..542dce6536 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala @@ -45,7 +45,7 @@ private[spark] class CsvSink(val property: Properties, val registry: MetricRegis case Some(s) => TimeUnit.valueOf(s.toUpperCase()) case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT) } - + MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match { diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala index 2f7576c53b..3ffaaab23d 100644 --- a/core/src/main/scala/org/apache/spark/network/Connection.scala +++ b/core/src/main/scala/org/apache/spark/network/Connection.scala @@ -248,14 +248,14 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, } } - // outbox is used as a lock - ensure that it is always used as a leaf (since methods which + // outbox is used as a lock - ensure that it is always used as a leaf (since methods which // lock it are invoked in context of other locks) private val outbox = new Outbox() /* - This is orthogonal to whether we have pending bytes to write or not - and satisfies a slightly - different purpose. This flag is to see if we need to force reregister for write even when we + This is orthogonal to whether we have pending bytes to write or not - and satisfies a slightly + different purpose. This flag is to see if we need to force reregister for write even when we do not have any pending bytes to write to socket. - This can happen due to a race between adding pending buffers, and checking for existing of + This can happen due to a race between adding pending buffers, and checking for existing of data as detailed in https://github.com/mesos/spark/pull/791 */ private var needForceReregister = false diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionId.scala b/core/src/main/scala/org/apache/spark/network/ConnectionId.scala index ffaab677d4..d579c165a1 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionId.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionId.scala @@ -18,7 +18,7 @@ package org.apache.spark.network private[spark] case class ConnectionId(connectionManagerId: ConnectionManagerId, uniqId: Int) { - override def toString = connectionManagerId.host + "_" + connectionManagerId.port + "_" + uniqId + override def toString = connectionManagerId.host + "_" + connectionManagerId.port + "_" + uniqId } private[spark] object ConnectionId { @@ -26,9 +26,9 @@ private[spark] object ConnectionId { def createConnectionIdFromString(connectionIdString: String): ConnectionId = { val res = connectionIdString.split("_").map(_.trim()) if (res.size != 3) { - throw new Exception("Error converting ConnectionId string: " + connectionIdString + + throw new Exception("Error converting ConnectionId string: " + connectionIdString + " to a ConnectionId Object") } new ConnectionId(new ConnectionManagerId(res(0), res(1).toInt), res(2).toInt) - } + } } diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index bdf586351a..cfee41c613 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -79,7 +79,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, private val serverChannel = ServerSocketChannel.open() // used to track the SendingConnections waiting to do SASL negotiation - private val connectionsAwaitingSasl = new HashMap[ConnectionId, SendingConnection] + private val connectionsAwaitingSasl = new HashMap[ConnectionId, SendingConnection] with SynchronizedMap[ConnectionId, SendingConnection] private val connectionsByKey = new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection] @@ -141,7 +141,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, } finally { writeRunnableStarted.synchronized { writeRunnableStarted -= key - val needReregister = register || conn.resetForceReregister() + val needReregister = register || conn.resetForceReregister() if (needReregister && conn.changeInterestForWrite()) { conn.registerInterest() } @@ -509,7 +509,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, private def handleClientAuthentication( waitingConn: SendingConnection, - securityMsg: SecurityMessage, + securityMsg: SecurityMessage, connectionId : ConnectionId) { if (waitingConn.isSaslComplete()) { logDebug("Client sasl completed for id: " + waitingConn.connectionId) @@ -530,7 +530,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, } return } - var securityMsgResp = SecurityMessage.fromResponse(replyToken, + var securityMsgResp = SecurityMessage.fromResponse(replyToken, securityMsg.getConnectionId.toString()) var message = securityMsgResp.toBufferMessage if (message == null) throw new Exception("Error creating security message") @@ -546,7 +546,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, } private def handleServerAuthentication( - connection: Connection, + connection: Connection, securityMsg: SecurityMessage, connectionId: ConnectionId) { if (!connection.isSaslComplete()) { @@ -561,7 +561,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, } replyToken = connection.sparkSaslServer.response(securityMsg.getToken) if (connection.isSaslComplete()) { - logDebug("Server sasl completed: " + connection.connectionId) + logDebug("Server sasl completed: " + connection.connectionId) } else { logDebug("Server sasl not completed: " + connection.connectionId) } @@ -571,7 +571,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, var message = securityMsgResp.toBufferMessage if (message == null) throw new Exception("Error creating security Message") sendSecurityMessage(connection.getRemoteConnectionManagerId(), message) - } + } } catch { case e: Exception => { logError("Error in server auth negotiation: " + e) @@ -581,7 +581,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, } } } else { - logDebug("connection already established for this connection id: " + connection.connectionId) + logDebug("connection already established for this connection id: " + connection.connectionId) } } @@ -609,8 +609,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, return true } else { if (!conn.isSaslComplete()) { - // We could handle this better and tell the client we need to do authentication - // negotiation, but for now just ignore them. + // We could handle this better and tell the client we need to do authentication + // negotiation, but for now just ignore them. logError("message sent that is not security negotiation message on connection " + "not authenticated yet, ignoring it!!") return true @@ -709,11 +709,11 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, } } } else { - logDebug("Sasl already established ") + logDebug("Sasl already established ") } } - // allow us to add messages to the inbox for doing sasl negotiating + // allow us to add messages to the inbox for doing sasl negotiating private def sendSecurityMessage(connManagerId: ConnectionManagerId, message: Message) { def startNewConnection(): SendingConnection = { val inetSocketAddress = new InetSocketAddress(connManagerId.host, connManagerId.port) @@ -772,7 +772,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, if (((clock.getTime() - startTime) >= (authTimeout * 1000)) && (!connection.isSaslComplete())) { // took to long to authenticate the connection, something probably went wrong - throw new Exception("Took to long for authentication to " + connectionManagerId + + throw new Exception("Took to long for authentication to " + connectionManagerId + ", waited " + authTimeout + "seconds, failing.") } } @@ -794,7 +794,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, } } case None => { - logError("no messageStatus for failed message id: " + message.id) + logError("no messageStatus for failed message id: " + message.id) } } } diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala index 9d9b9dbdd5..4894ecd41f 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala @@ -37,11 +37,11 @@ private[spark] object ConnectionManagerTest extends Logging{ "[size of msg in MB (integer)] [count] [await time in seconds)] ") System.exit(1) } - + if (args(0).startsWith("local")) { println("This runs only on a mesos cluster") } - + val sc = new SparkContext(args(0), "ConnectionManagerTest") val slavesFile = Source.fromFile(args(1)) val slaves = slavesFile.mkString.split("\n") @@ -50,7 +50,7 @@ private[spark] object ConnectionManagerTest extends Logging{ /* println("Slaves") */ /* slaves.foreach(println) */ val tasknum = if (args.length > 2) args(2).toInt else slaves.length - val size = ( if (args.length > 3) (args(3).toInt) else 10 ) * 1024 * 1024 + val size = ( if (args.length > 3) (args(3).toInt) else 10 ) * 1024 * 1024 val count = if (args.length > 4) args(4).toInt else 3 val awaitTime = (if (args.length > 5) args(5).toInt else 600 ).second println("Running " + count + " rounds of test: " + "parallel tasks = " + tasknum + ", " + @@ -64,16 +64,16 @@ private[spark] object ConnectionManagerTest extends Logging{ (0 until count).foreach(i => { val resultStrs = sc.parallelize(0 until tasknum, tasknum).map(i => { val connManager = SparkEnv.get.connectionManager - val thisConnManagerId = connManager.id - connManager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { + val thisConnManagerId = connManager.id + connManager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { logInfo("Received [" + msg + "] from [" + id + "]") None }) val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte)) buffer.flip - - val startTime = System.currentTimeMillis + + val startTime = System.currentTimeMillis val futures = slaveConnManagerIds.filter(_ != thisConnManagerId).map{ slaveConnManagerId => { val bufferMessage = Message.createBufferMessage(buffer.duplicate) @@ -84,7 +84,7 @@ private[spark] object ConnectionManagerTest extends Logging{ val results = futures.map(f => Await.result(f, awaitTime)) val finishTime = System.currentTimeMillis Thread.sleep(5000) - + val mb = size * results.size / 1024.0 / 1024.0 val ms = finishTime - startTime val resultStr = thisConnManagerId + " Sent " + mb + " MB in " + ms + " ms at " + (mb / ms * @@ -92,11 +92,11 @@ private[spark] object ConnectionManagerTest extends Logging{ logInfo(resultStr) resultStr }).collect() - - println("---------------------") - println("Run " + i) + + println("---------------------") + println("Run " + i) resultStrs.foreach(println) - println("---------------------") + println("---------------------") }) } } diff --git a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala index 2b41c403b2..9dc51e0d40 100644 --- a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala +++ b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala @@ -18,7 +18,7 @@ package org.apache.spark.network import java.nio.ByteBuffer -import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.{SecurityManager, SparkConf} private[spark] object ReceiverTest { def main(args: Array[String]) { diff --git a/core/src/main/scala/org/apache/spark/network/SecurityMessage.scala b/core/src/main/scala/org/apache/spark/network/SecurityMessage.scala index 0d9f743b36..a1dfc4094c 100644 --- a/core/src/main/scala/org/apache/spark/network/SecurityMessage.scala +++ b/core/src/main/scala/org/apache/spark/network/SecurityMessage.scala @@ -26,33 +26,33 @@ import org.apache.spark._ import org.apache.spark.network._ /** - * SecurityMessage is class that contains the connectionId and sasl token + * SecurityMessage is class that contains the connectionId and sasl token * used in SASL negotiation. SecurityMessage has routines for converting * it to and from a BufferMessage so that it can be sent by the ConnectionManager * and easily consumed by users when received. * The api was modeled after BlockMessage. * - * The connectionId is the connectionId of the client side. Since + * The connectionId is the connectionId of the client side. Since * message passing is asynchronous and its possible for the server side (receiving) - * to get multiple different types of messages on the same connection the connectionId - * is used to know which connnection the security message is intended for. - * + * to get multiple different types of messages on the same connection the connectionId + * is used to know which connnection the security message is intended for. + * * For instance, lets say we are node_0. We need to send data to node_1. The node_0 side * is acting as a client and connecting to node_1. SASL negotiation has to occur - * between node_0 and node_1 before node_1 trusts node_0 so node_0 sends a security message. - * node_1 receives the message from node_0 but before it can process it and send a response, - * some thread on node_1 decides it needs to send data to node_0 so it connects to node_0 - * and sends a security message of its own to authenticate as a client. Now node_0 gets - * the message and it needs to decide if this message is in response to it being a client - * (from the first send) or if its just node_1 trying to connect to it to send data. This + * between node_0 and node_1 before node_1 trusts node_0 so node_0 sends a security message. + * node_1 receives the message from node_0 but before it can process it and send a response, + * some thread on node_1 decides it needs to send data to node_0 so it connects to node_0 + * and sends a security message of its own to authenticate as a client. Now node_0 gets + * the message and it needs to decide if this message is in response to it being a client + * (from the first send) or if its just node_1 trying to connect to it to send data. This * is where the connectionId field is used. node_0 can lookup the connectionId to see if * it is in response to it being a client or if its in response to someone sending other data. - * + * * The format of a SecurityMessage as its sent is: * - Length of the ConnectionId - * - ConnectionId + * - ConnectionId * - Length of the token - * - Token + * - Token */ private[spark] class SecurityMessage() extends Logging { @@ -61,13 +61,13 @@ private[spark] class SecurityMessage() extends Logging { def set(byteArr: Array[Byte], newconnectionId: String) { if (byteArr == null) { - token = new Array[Byte](0) + token = new Array[Byte](0) } else { token = byteArr } connectionId = newconnectionId } - + /** * Read the given buffer and set the members of this class. */ @@ -91,17 +91,17 @@ private[spark] class SecurityMessage() extends Logging { buffer.clear() set(buffer) } - + def getConnectionId: String = { return connectionId } - + def getToken: Array[Byte] = { return token } - + /** - * Create a BufferMessage that can be sent by the ConnectionManager containing + * Create a BufferMessage that can be sent by the ConnectionManager containing * the security information from this class. * @return BufferMessage */ @@ -110,12 +110,12 @@ private[spark] class SecurityMessage() extends Logging { val buffers = new ArrayBuffer[ByteBuffer]() // 4 bytes for the length of the connectionId - // connectionId is of type char so multiple the length by 2 to get number of bytes + // connectionId is of type char so multiple the length by 2 to get number of bytes // 4 bytes for the length of token // token is a byte buffer so just take the length var buffer = ByteBuffer.allocate(4 + connectionId.length() * 2 + 4 + token.length) buffer.putInt(connectionId.length()) - connectionId.foreach((x: Char) => buffer.putChar(x)) + connectionId.foreach((x: Char) => buffer.putChar(x)) buffer.putInt(token.length) if (token.length > 0) { @@ -123,7 +123,7 @@ private[spark] class SecurityMessage() extends Logging { } buffer.flip() buffers += buffer - + var message = Message.createBufferMessage(buffers) logDebug("message total size is : " + message.size) message.isSecurityNeg = true @@ -136,7 +136,7 @@ private[spark] class SecurityMessage() extends Logging { } private[spark] object SecurityMessage { - + /** * Convert the given BufferMessage to a SecurityMessage by parsing the contents * of the BufferMessage and populating the SecurityMessage fields. diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala index 4164e81d3a..136c191204 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala @@ -36,8 +36,8 @@ private[spark] class FileHeader ( if (FileHeader.HEADER_SIZE - buf.readableBytes > 0 ) { buf.writeZero(FileHeader.HEADER_SIZE - buf.readableBytes) } else { - throw new Exception("too long header " + buf.readableBytes) - logInfo("too long header") + throw new Exception("too long header " + buf.readableBytes) + logInfo("too long header") } buf } diff --git a/core/src/main/scala/org/apache/spark/partial/PartialResult.scala b/core/src/main/scala/org/apache/spark/partial/PartialResult.scala index eade07fbcb..cadd0c7ed1 100644 --- a/core/src/main/scala/org/apache/spark/partial/PartialResult.scala +++ b/core/src/main/scala/org/apache/spark/partial/PartialResult.scala @@ -44,7 +44,7 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) { } } - /** + /** * Set a handler to be called when this PartialResult completes. Only one completion handler * is supported per PartialResult. */ @@ -60,7 +60,7 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) { return this } - /** + /** * Set a handler to be called if this PartialResult's job fails. Only one failure handler * is supported per PartialResult. */ diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala index 2306c9736b..9ca971c8a4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala @@ -52,7 +52,7 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { /** Compute the standard deviation of this RDD's elements. */ def stdev(): Double = stats().stdev - /** + /** * Compute the sample standard deviation of this RDD's elements (which corrects for bias in * estimating the standard deviation by dividing by N-1 instead of N). */ @@ -123,13 +123,13 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { * e.g. for the array * [1, 10, 20, 50] the buckets are [1, 10) [10, 20) [20, 50] * e.g 1<=x<10 , 10<=x<20, 20<=x<50 - * And on the input of 1 and 50 we would have a histogram of 1, 0, 0 - * + * And on the input of 1 and 50 we would have a histogram of 1, 0, 0 + * * Note: if your histogram is evenly spaced (e.g. [0, 10, 20, 30]) this can be switched * from an O(log n) inseration to O(1) per element. (where n = # buckets) if you set evenBuckets * to true. * buckets must be sorted and not contain any duplicates. - * buckets array must be at least two elements + * buckets array must be at least two elements * All NaN entries are treated the same. If you have a NaN bucket it must be * the maximum value of the last position and all NaN entries will be counted * in that bucket. diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala index a84357b384..0c2cd7a247 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala @@ -33,7 +33,7 @@ class PartitionerAwareUnionRDDPartition( val idx: Int ) extends Partition { var parents = rdds.map(_.partitions(idx)).toArray - + override val index = idx override def hashCode(): Int = idx diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index 04c53d4684..293cfb6564 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -54,7 +54,7 @@ private[scheduler] case class BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent private[scheduler] -case class GettingResultEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent +case class GettingResultEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent private[scheduler] case class CompletionEvent( task: Task[_], diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 76f3e327d6..545fa453b7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -1,107 +1,107 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler - -import java.util.concurrent.LinkedBlockingQueue - -import org.apache.spark.Logging - -/** - * Asynchronously passes SparkListenerEvents to registered SparkListeners. - * - * Until start() is called, all posted events are only buffered. Only after this listener bus - * has started will events be actually propagated to all attached listeners. This listener bus - * is stopped when it receives a SparkListenerShutdown event, which is posted using stop(). - */ -private[spark] class LiveListenerBus extends SparkListenerBus with Logging { - - /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than - * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */ - private val EVENT_QUEUE_CAPACITY = 10000 - private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) - private var queueFullErrorMessageLogged = false - private var started = false - private val listenerThread = new Thread("SparkListenerBus") { - setDaemon(true) - override def run() { - while (true) { - val event = eventQueue.take - if (event == SparkListenerShutdown) { - // Get out of the while loop and shutdown the daemon thread - return - } - postToAll(event) - } - } - } - - // Exposed for testing - @volatile private[spark] var stopCalled = false - - /** - * Start sending events to attached listeners. - * - * This first sends out all buffered events posted before this listener bus has started, then - * listens for any additional events asynchronously while the listener bus is still running. - * This should only be called once. - */ - def start() { - if (started) { - throw new IllegalStateException("Listener bus already started!") - } - listenerThread.start() - started = true - } - - def post(event: SparkListenerEvent) { - val eventAdded = eventQueue.offer(event) - if (!eventAdded && !queueFullErrorMessageLogged) { - logError("Dropping SparkListenerEvent because no remaining room in event queue. " + - "This likely means one of the SparkListeners is too slow and cannot keep up with the " + - "rate at which tasks are being started by the scheduler.") - queueFullErrorMessageLogged = true - } - } - - /** - * Waits until there are no more events in the queue, or until the specified time has elapsed. - * Used for testing only. Returns true if the queue has emptied and false is the specified time - * elapsed before the queue emptied. - */ - def waitUntilEmpty(timeoutMillis: Int): Boolean = { - val finishTime = System.currentTimeMillis + timeoutMillis - while (!eventQueue.isEmpty) { - if (System.currentTimeMillis > finishTime) { - return false - } - /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify - * add overhead in the general case. */ - Thread.sleep(10) - } - true - } - - def stop() { - stopCalled = true - if (!started) { - throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!") - } - post(SparkListenerShutdown) - listenerThread.join() - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.LinkedBlockingQueue + +import org.apache.spark.Logging + +/** + * Asynchronously passes SparkListenerEvents to registered SparkListeners. + * + * Until start() is called, all posted events are only buffered. Only after this listener bus + * has started will events be actually propagated to all attached listeners. This listener bus + * is stopped when it receives a SparkListenerShutdown event, which is posted using stop(). + */ +private[spark] class LiveListenerBus extends SparkListenerBus with Logging { + + /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than + * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */ + private val EVENT_QUEUE_CAPACITY = 10000 + private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) + private var queueFullErrorMessageLogged = false + private var started = false + private val listenerThread = new Thread("SparkListenerBus") { + setDaemon(true) + override def run() { + while (true) { + val event = eventQueue.take + if (event == SparkListenerShutdown) { + // Get out of the while loop and shutdown the daemon thread + return + } + postToAll(event) + } + } + } + + // Exposed for testing + @volatile private[spark] var stopCalled = false + + /** + * Start sending events to attached listeners. + * + * This first sends out all buffered events posted before this listener bus has started, then + * listens for any additional events asynchronously while the listener bus is still running. + * This should only be called once. + */ + def start() { + if (started) { + throw new IllegalStateException("Listener bus already started!") + } + listenerThread.start() + started = true + } + + def post(event: SparkListenerEvent) { + val eventAdded = eventQueue.offer(event) + if (!eventAdded && !queueFullErrorMessageLogged) { + logError("Dropping SparkListenerEvent because no remaining room in event queue. " + + "This likely means one of the SparkListeners is too slow and cannot keep up with the " + + "rate at which tasks are being started by the scheduler.") + queueFullErrorMessageLogged = true + } + } + + /** + * Waits until there are no more events in the queue, or until the specified time has elapsed. + * Used for testing only. Returns true if the queue has emptied and false is the specified time + * elapsed before the queue emptied. + */ + def waitUntilEmpty(timeoutMillis: Int): Boolean = { + val finishTime = System.currentTimeMillis + timeoutMillis + while (!eventQueue.isEmpty) { + if (System.currentTimeMillis > finishTime) { + return false + } + /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify + * add overhead in the general case. */ + Thread.sleep(10) + } + true + } + + def stop() { + stopCalled = true + if (!started) { + throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!") + } + post(SparkListenerShutdown) + listenerThread.join() + } +} diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala index 2fbbda5b76..ace9cd51c9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala @@ -240,7 +240,7 @@ object BlockFetcherIterator { override def numRemoteBlocks: Int = numRemote override def fetchWaitTime: Long = _fetchWaitTime override def remoteBytesRead: Long = _remoteBytesRead - + // Implementing the Iterator methods with an iterator that reads fetched blocks off the queue // as they arrive. diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a2a7291300..df9bb4044e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -388,7 +388,7 @@ private[spark] class BlockManager( logDebug("Block " + blockId + " not found in memory") } } - + // Look for the block in Tachyon if (level.useOffHeap) { logDebug("Getting block " + blockId + " from tachyon") @@ -1031,7 +1031,7 @@ private[spark] class BlockManager( memoryStore.clear() diskStore.clear() if (tachyonInitialized) { - tachyonStore.clear() + tachyonStore.clear() } metadataCleaner.cancel() broadcastCleaner.cancel() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala index 7168ae18c2..337b45b727 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala @@ -37,7 +37,7 @@ private[spark] class BlockMessage() { private var id: BlockId = null private var data: ByteBuffer = null private var level: StorageLevel = null - + def set(getBlock: GetBlock) { typ = BlockMessage.TYPE_GET_BLOCK id = getBlock.id @@ -75,13 +75,13 @@ private[spark] class BlockMessage() { idBuilder += buffer.getChar() } id = BlockId(idBuilder.toString) - + if (typ == BlockMessage.TYPE_PUT_BLOCK) { val booleanInt = buffer.getInt() val replication = buffer.getInt() level = StorageLevel(booleanInt, replication) - + val dataLength = buffer.getInt() data = ByteBuffer.allocate(dataLength) if (dataLength != buffer.remaining) { @@ -108,12 +108,12 @@ private[spark] class BlockMessage() { buffer.clear() set(buffer) } - + def getType: Int = typ def getId: BlockId = id def getData: ByteBuffer = data def getLevel: StorageLevel = level - + def toBufferMessage: BufferMessage = { val startTime = System.currentTimeMillis val buffers = new ArrayBuffer[ByteBuffer]() @@ -127,7 +127,7 @@ private[spark] class BlockMessage() { buffer = ByteBuffer.allocate(8).putInt(level.toInt).putInt(level.replication) buffer.flip() buffers += buffer - + buffer = ByteBuffer.allocate(4).putInt(data.remaining) buffer.flip() buffers += buffer @@ -140,7 +140,7 @@ private[spark] class BlockMessage() { buffers += data } - + /* println() println("BlockMessage: ") @@ -158,7 +158,7 @@ private[spark] class BlockMessage() { } override def toString: String = { - "BlockMessage [type = " + typ + ", id = " + id + ", level = " + level + + "BlockMessage [type = " + typ + ", id = " + id + ", level = " + level + ", data = " + (if (data != null) data.remaining.toString else "null") + "]" } } @@ -168,7 +168,7 @@ private[spark] object BlockMessage { val TYPE_GET_BLOCK: Int = 1 val TYPE_GOT_BLOCK: Int = 2 val TYPE_PUT_BLOCK: Int = 3 - + def fromBufferMessage(bufferMessage: BufferMessage): BlockMessage = { val newBlockMessage = new BlockMessage() newBlockMessage.set(bufferMessage) @@ -192,7 +192,7 @@ private[spark] object BlockMessage { newBlockMessage.set(gotBlock) newBlockMessage } - + def fromPutBlock(putBlock: PutBlock): BlockMessage = { val newBlockMessage = new BlockMessage() newBlockMessage.set(putBlock) @@ -206,7 +206,7 @@ private[spark] object BlockMessage { val bMsg = B.toBufferMessage val C = new BlockMessage() C.set(bMsg) - + println(B.getId + " " + B.getLevel) println(C.getId + " " + C.getLevel) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala index dc62b1efaa..973d85c0a9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala @@ -27,16 +27,16 @@ import org.apache.spark.network._ private[spark] class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockMessage] with Logging { - + def this(bm: BlockMessage) = this(Array(bm)) def this() = this(null.asInstanceOf[Seq[BlockMessage]]) - def apply(i: Int) = blockMessages(i) + def apply(i: Int) = blockMessages(i) def iterator = blockMessages.iterator - def length = blockMessages.length + def length = blockMessages.length def set(bufferMessage: BufferMessage) { val startTime = System.currentTimeMillis @@ -62,15 +62,15 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) logDebug("Trying to convert buffer " + newBuffer + " to block message") val newBlockMessage = BlockMessage.fromByteBuffer(newBuffer) logDebug("Created " + newBlockMessage) - newBlockMessages += newBlockMessage + newBlockMessages += newBlockMessage buffer.position(buffer.position() + size) } val finishTime = System.currentTimeMillis logDebug("Converted block message array from buffer message in " + (finishTime - startTime) / 1000.0 + " s") - this.blockMessages = newBlockMessages + this.blockMessages = newBlockMessages } - + def toBufferMessage: BufferMessage = { val buffers = new ArrayBuffer[ByteBuffer]() @@ -83,7 +83,7 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) buffers ++= bufferMessage.buffers logDebug("Added " + bufferMessage) }) - + logDebug("Buffer list:") buffers.foreach((x: ByteBuffer) => logDebug("" + x)) /* @@ -103,13 +103,13 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) } private[spark] object BlockMessageArray { - + def fromBufferMessage(bufferMessage: BufferMessage): BlockMessageArray = { val newBlockMessageArray = new BlockMessageArray() newBlockMessageArray.set(bufferMessage) newBlockMessageArray } - + def main(args: Array[String]) { val blockMessages = (0 until 10).map { i => @@ -124,10 +124,10 @@ private[spark] object BlockMessageArray { } val blockMessageArray = new BlockMessageArray(blockMessages) println("Block message array created") - + val bufferMessage = blockMessageArray.toBufferMessage println("Converted to buffer message") - + val totalSize = bufferMessage.size val newBuffer = ByteBuffer.allocate(totalSize) newBuffer.clear() @@ -137,7 +137,7 @@ private[spark] object BlockMessageArray { buffer.rewind() }) newBuffer.flip - val newBufferMessage = Message.createBufferMessage(newBuffer) + val newBufferMessage = Message.createBufferMessage(newBuffer) println("Copied to new buffer message, size = " + newBufferMessage.size) val newBlockMessageArray = BlockMessageArray.fromBufferMessage(newBufferMessage) @@ -147,7 +147,7 @@ private[spark] object BlockMessageArray { case BlockMessage.TYPE_PUT_BLOCK => { val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel) println(pB) - } + } case BlockMessage.TYPE_GET_BLOCK => { val gB = new GetBlock(blockMessage.getId) println(gB) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index e1a1f209c9..9ce0398d01 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -136,7 +136,7 @@ private[spark] object JettyUtils extends Logging { private def addFilters(handlers: Seq[ServletContextHandler], conf: SparkConf) { val filters: Array[String] = conf.get("spark.ui.filters", "").split(',').map(_.trim()) filters.foreach { - case filter : String => + case filter : String => if (!filter.isEmpty) { logInfo("Adding filter: " + filter) val holder : FilterHolder = new FilterHolder() @@ -151,7 +151,7 @@ private[spark] object JettyUtils extends Logging { if (parts.length == 2) holder.setInitParameter(parts(0), parts(1)) } } - val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.ERROR, + val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.ERROR, DispatcherType.FORWARD, DispatcherType.INCLUDE, DispatcherType.REQUEST) handlers.foreach { case(handler) => handler.addFilter(holder, "/*", enumDispatcher) } } diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index a487924eff..a7cf04b3cb 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -48,7 +48,7 @@ private[spark] object UIUtils { case _ => <li><a href={prependBaseUri(basePath, "/storage")}>Storage</a></li> } val environment = page match { - case Environment => + case Environment => <li class="active"><a href={prependBaseUri(basePath, "/environment")}>Environment</a></li> case _ => <li><a href={prependBaseUri(basePath, "/environment")}>Environment</a></li> } diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index cdbbc65292..2d05e09b10 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -45,7 +45,7 @@ private[spark] object ClosureCleaner extends Logging { private def isClosure(cls: Class[_]): Boolean = { cls.getName.contains("$anonfun$") } - + // Get a list of the classes of the outer objects of a given closure object, obj; // the outer objects are defined as any closures that obj is nested within, plus // possibly the class that the outermost closure is in, if any. We stop searching @@ -63,7 +63,7 @@ private[spark] object ClosureCleaner extends Logging { } Nil } - + // Get a list of the outer objects for a given closure object. private def getOuterObjects(obj: AnyRef): List[AnyRef] = { for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") { @@ -76,7 +76,7 @@ private[spark] object ClosureCleaner extends Logging { } Nil } - + private def getInnerClasses(obj: AnyRef): List[Class[_]] = { val seen = Set[Class[_]](obj.getClass) var stack = List[Class[_]](obj.getClass) @@ -92,7 +92,7 @@ private[spark] object ClosureCleaner extends Logging { } return (seen - obj.getClass).toList } - + private def createNullValue(cls: Class[_]): AnyRef = { if (cls.isPrimitive) { new java.lang.Byte(0: Byte) // Should be convertible to any primitive type @@ -100,13 +100,13 @@ private[spark] object ClosureCleaner extends Logging { null } } - + def clean(func: AnyRef) { // TODO: cache outerClasses / innerClasses / accessedFields val outerClasses = getOuterClasses(func) val innerClasses = getInnerClasses(func) val outerObjects = getOuterObjects(func) - + val accessedFields = Map[Class[_], Set[String]]() for (cls <- outerClasses) accessedFields(cls) = Set[String]() @@ -143,7 +143,7 @@ private[spark] object ClosureCleaner extends Logging { field.set(outer, value) } } - + if (outer != null) { // logInfo("2: Setting $outer on " + func.getClass + " to " + outer); val field = func.getClass.getDeclaredField("$outer") @@ -151,7 +151,7 @@ private[spark] object ClosureCleaner extends Logging { field.set(func, outer) } } - + private def instantiateClass(cls: Class[_], outer: AnyRef, inInterpreter: Boolean): AnyRef = { // logInfo("Creating a " + cls + " with outer = " + outer) if (!inInterpreter) { @@ -192,7 +192,7 @@ class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor } } } - + override def visitMethodInsn(op: Int, owner: String, name: String, desc: String) { // Check for calls a getter method for a variable in an interpreter wrapper object. @@ -209,12 +209,12 @@ class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM4) { var myName: String = null - + override def visit(version: Int, access: Int, name: String, sig: String, superName: String, interfaces: Array[String]) { myName = name } - + override def visitMethod(access: Int, name: String, desc: String, sig: String, exceptions: Array[String]): MethodVisitor = { new MethodVisitor(ASM4) { diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index d990fd49ef..f2396f7c80 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -611,7 +611,7 @@ private[spark] object JsonProtocol { val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel) rddInfo.numCachedPartitions = numCachedPartitions rddInfo.memSize = memSize - rddInfo.tachyonSize = tachyonSize + rddInfo.tachyonSize = tachyonSize rddInfo.diskSize = diskSize rddInfo } diff --git a/core/src/main/scala/org/apache/spark/util/NextIterator.scala b/core/src/main/scala/org/apache/spark/util/NextIterator.scala index 8266e5e495..e5c732a5a5 100644 --- a/core/src/main/scala/org/apache/spark/util/NextIterator.scala +++ b/core/src/main/scala/org/apache/spark/util/NextIterator.scala @@ -19,7 +19,7 @@ package org.apache.spark.util /** Provides a basic/boilerplate Iterator implementation. */ private[spark] abstract class NextIterator[U] extends Iterator[U] { - + private var gotNext = false private var nextValue: U = _ private var closed = false @@ -34,7 +34,7 @@ private[spark] abstract class NextIterator[U] extends Iterator[U] { * This convention is required because `null` may be a valid value, * and using `Option` seems like it might create unnecessary Some/None * instances, given some iterators might be called in a tight loop. - * + * * @return U, or set 'finished' when done */ protected def getNext(): U diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala b/core/src/main/scala/org/apache/spark/util/StatCounter.scala index 732748a7ff..d80eed455c 100644 --- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala +++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala @@ -62,10 +62,10 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { if (n == 0) { mu = other.mu m2 = other.m2 - n = other.n + n = other.n maxValue = other.maxValue minValue = other.minValue - } else if (other.n != 0) { + } else if (other.n != 0) { val delta = other.mu - mu if (other.n * 10 < n) { mu = mu + (delta * other.n) / (n + other.n) diff --git a/core/src/main/scala/org/apache/spark/util/Vector.scala b/core/src/main/scala/org/apache/spark/util/Vector.scala index 3c8f94a416..1a647fa1c9 100644 --- a/core/src/main/scala/org/apache/spark/util/Vector.scala +++ b/core/src/main/scala/org/apache/spark/util/Vector.scala @@ -136,7 +136,7 @@ object Vector { def ones(length: Int) = Vector(length, _ => 1) /** - * Creates this [[org.apache.spark.util.Vector]] of given length containing random numbers + * Creates this [[org.apache.spark.util.Vector]] of given length containing random numbers * between 0.0 and 1.0. Optional scala.util.Random number generator can be provided. */ def random(length: Int, random: Random = new XORShiftRandom()) = diff --git a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala index 8a4cdea2fa..7f220383f9 100644 --- a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala +++ b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala @@ -25,28 +25,28 @@ import scala.util.hashing.MurmurHash3 import org.apache.spark.util.Utils.timeIt /** - * This class implements a XORShift random number generator algorithm + * This class implements a XORShift random number generator algorithm * Source: * Marsaglia, G. (2003). Xorshift RNGs. Journal of Statistical Software, Vol. 8, Issue 14. * @see <a href="http://www.jstatsoft.org/v08/i14/paper">Paper</a> * This implementation is approximately 3.5 times faster than * {@link java.util.Random java.util.Random}, partly because of the algorithm, but also due - * to renouncing thread safety. JDK's implementation uses an AtomicLong seed, this class + * to renouncing thread safety. JDK's implementation uses an AtomicLong seed, this class * uses a regular Long. We can forgo thread safety since we use a new instance of the RNG * for each thread. */ private[spark] class XORShiftRandom(init: Long) extends JavaRandom(init) { - + def this() = this(System.nanoTime) private var seed = XORShiftRandom.hashSeed(init) // we need to just override next - this will be called by nextInt, nextDouble, // nextGaussian, nextLong, etc. - override protected def next(bits: Int): Int = { + override protected def next(bits: Int): Int = { var nextSeed = seed ^ (seed << 21) nextSeed ^= (nextSeed >>> 35) - nextSeed ^= (nextSeed << 4) + nextSeed ^= (nextSeed << 4) seed = nextSeed (nextSeed & ((1L << bits) -1)).asInstanceOf[Int] } @@ -89,7 +89,7 @@ private[spark] object XORShiftRandom { val million = 1e6.toInt val javaRand = new JavaRandom(seed) val xorRand = new XORShiftRandom(seed) - + // this is just to warm up the JIT - we're not timing anything timeIt(1e6.toInt) { javaRand.nextInt() @@ -97,9 +97,9 @@ private[spark] object XORShiftRandom { } val iters = timeIt(numIters)(_) - + /* Return results as a map instead of just printing to screen - in case the user wants to do something with them */ + in case the user wants to do something with them */ Map("javaTime" -> iters {javaRand.nextInt()}, "xorTime" -> iters {xorRand.nextInt()}) diff --git a/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala index c5f24c66ce..c645e4cbe8 100644 --- a/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala @@ -37,7 +37,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val securityManager = new SecurityManager(conf); val hostname = "localhost" - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, securityManager = securityManager) System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext System.setProperty("spark.hostPort", hostname + ":" + boundPort) @@ -54,14 +54,14 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { assert(securityManagerBad.isAuthenticationEnabled() === true) - val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, + val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf, securityManager = securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) val selection = slaveSystem.actorSelection( s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker") val timeout = AkkaUtils.lookupTimeout(conf) - intercept[akka.actor.ActorNotFound] { - slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) + intercept[akka.actor.ActorNotFound] { + slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) } actorSystem.shutdown() @@ -75,7 +75,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val securityManager = new SecurityManager(conf); val hostname = "localhost" - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, securityManager = securityManager) System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext System.setProperty("spark.hostPort", hostname + ":" + boundPort) @@ -91,7 +91,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { badconf.set("spark.authenticate.secret", "good") val securityManagerBad = new SecurityManager(badconf); - val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, + val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = badconf, securityManager = securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) val selection = slaveSystem.actorSelection( @@ -127,7 +127,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val securityManager = new SecurityManager(conf); val hostname = "localhost" - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, securityManager = securityManager) System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext System.setProperty("spark.hostPort", hostname + ":" + boundPort) @@ -180,7 +180,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val securityManager = new SecurityManager(conf); val hostname = "localhost" - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, securityManager = securityManager) System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext System.setProperty("spark.hostPort", hostname + ":" + boundPort) @@ -204,8 +204,8 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val selection = slaveSystem.actorSelection( s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker") val timeout = AkkaUtils.lookupTimeout(conf) - intercept[akka.actor.ActorNotFound] { - slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) + intercept[akka.actor.ActorNotFound] { + slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) } actorSystem.shutdown() diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala index 9cbdfc54a3..7f59bdcce4 100644 --- a/core/src/test/scala/org/apache/spark/DriverSuite.scala +++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala @@ -39,7 +39,7 @@ class DriverSuite extends FunSuite with Timeouts { failAfter(60 seconds) { Utils.executeAndGetOutput( Seq("./bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master), - new File(sparkHome), + new File(sparkHome), Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome)) } } diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala index aee9ab9091..d651fbbac4 100644 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala @@ -45,7 +45,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { val pw = new PrintWriter(textFile) pw.println("100") pw.close() - + val jarFile = new File(tmpDir, "test.jar") val jarStream = new FileOutputStream(jarFile) val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest()) @@ -53,7 +53,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { val jarEntry = new JarEntry(textFile.getName) jar.putNextEntry(jarEntry) - + val in = new FileInputStream(textFile) val buffer = new Array[Byte](10240) var nRead = 0 diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 01af940771..b9b668d3cc 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -106,7 +106,7 @@ class FileSuite extends FunSuite with LocalSparkContext { sc = new SparkContext("local", "test") val tempDir = Files.createTempDir() val outputDir = new File(tempDir, "output").getAbsolutePath - val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), "a" * x)) + val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), "a" * x)) nums.saveAsSequenceFile(outputDir) // Try reading the output back as a SequenceFile val output = sc.sequenceFile[IntWritable, Text](outputDir) diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala index 0b5ed6d770..5e538d6fab 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala @@ -45,4 +45,4 @@ class WorkerWatcherSuite extends FunSuite { actorRef.underlyingActor.receive(new DisassociatedEvent(null, otherAkkaAddress, false)) assert(!actorRef.underlyingActor.isShutDown) } -} \ No newline at end of file +} diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala index 09e35bfc8f..e89b296d41 100644 --- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala @@ -42,7 +42,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll { override def beforeAll() { sc = new SparkContext("local", "test") - + // Set the block size of local file system to test whether files are split right or not. sc.hadoopConfiguration.setLong("fs.local.block.size", 32) } diff --git a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala index a4381a8b97..4df36558b6 100644 --- a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala @@ -34,14 +34,14 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { assert(slices(1).mkString(",") === "2") assert(slices(2).mkString(",") === "3") } - + test("one slice") { val data = Array(1, 2, 3) val slices = ParallelCollectionRDD.slice(data, 1) assert(slices.size === 1) assert(slices(0).mkString(",") === "1,2,3") } - + test("equal slices") { val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9) val slices = ParallelCollectionRDD.slice(data, 3) @@ -50,7 +50,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { assert(slices(1).mkString(",") === "4,5,6") assert(slices(2).mkString(",") === "7,8,9") } - + test("non-equal slices") { val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val slices = ParallelCollectionRDD.slice(data, 3) @@ -77,14 +77,14 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { assert(slices(1).mkString(",") === (33 to 66).mkString(",")) assert(slices(2).mkString(",") === (67 to 100).mkString(",")) } - + test("empty data") { val data = new Array[Int](0) val slices = ParallelCollectionRDD.slice(data, 5) assert(slices.size === 5) for (slice <- slices) assert(slice.size === 0) } - + test("zero slices") { val data = Array(1, 2, 3) intercept[IllegalArgumentException] { ParallelCollectionRDD.slice(data, 0) } @@ -94,7 +94,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { val data = Array(1, 2, 3) intercept[IllegalArgumentException] { ParallelCollectionRDD.slice(data, -5) } } - + test("exclusive ranges sliced into ranges") { val data = 1 until 100 val slices = ParallelCollectionRDD.slice(data, 3) @@ -102,7 +102,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { assert(slices.map(_.size).reduceLeft(_+_) === 99) assert(slices.forall(_.isInstanceOf[Range])) } - + test("inclusive ranges sliced into ranges") { val data = 1 to 100 val slices = ParallelCollectionRDD.slice(data, 3) @@ -124,7 +124,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { assert(range.step === 1, "slice " + i + " step") } } - + test("random array tests") { val gen = for { d <- arbitrary[List[Int]] @@ -141,7 +141,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { } check(prop) } - + test("random exclusive range tests") { val gen = for { a <- Gen.choose(-100, 100) @@ -177,7 +177,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { } check(prop) } - + test("exclusive ranges of longs") { val data = 1L until 100L val slices = ParallelCollectionRDD.slice(data, 3) @@ -185,7 +185,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { assert(slices.map(_.size).reduceLeft(_+_) === 99) assert(slices.forall(_.isInstanceOf[NumericRange[_]])) } - + test("inclusive ranges of longs") { val data = 1L to 100L val slices = ParallelCollectionRDD.slice(data, 3) @@ -193,7 +193,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { assert(slices.map(_.size).reduceLeft(_+_) === 100) assert(slices.forall(_.isInstanceOf[NumericRange[_]])) } - + test("exclusive ranges of doubles") { val data = 1.0 until 100.0 by 1.0 val slices = ParallelCollectionRDD.slice(data, 3) @@ -201,7 +201,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { assert(slices.map(_.size).reduceLeft(_+_) === 99) assert(slices.forall(_.isInstanceOf[NumericRange[_]])) } - + test("inclusive ranges of doubles") { val data = 1.0 to 100.0 by 1.0 val slices = ParallelCollectionRDD.slice(data, 3) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index dc704e07a8..4cdccdda6f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -216,7 +216,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc test("onTaskGettingResult() called when result fetched remotely") { val listener = new SaveTaskEvents sc.addSparkListener(listener) - + // Make a task whose result is larger than the akka frame size System.setProperty("spark.akka.frameSize", "1") val akkaFrameSize = @@ -236,7 +236,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc test("onTaskGettingResult() not called when result sent directly") { val listener = new SaveTaskEvents sc.addSparkListener(listener) - + // Make a task whose result is larger than the akka frame size val result = sc.parallelize(Seq(1), 1).map(2 * _).reduce { case (x, y) => x } assert(result === 2) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 356e28dd19..2fb750d9ee 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -264,7 +264,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin test("Scheduler does not always schedule tasks on the same workers") { sc = new SparkContext("local", "TaskSchedulerImplSuite") - val taskScheduler = new TaskSchedulerImpl(sc) + val taskScheduler = new TaskSchedulerImpl(sc) taskScheduler.initialize(new FakeSchedulerBackend) // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. val dagScheduler = new DAGScheduler(sc, taskScheduler) { diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 45c3224279..2f9739f940 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -33,8 +33,8 @@ class UISuite extends FunSuite { val server = new Server(startPort) Try { server.start() } match { - case Success(s) => - case Failure(e) => + case Success(s) => + case Failure(e) => // Either case server port is busy hence setup for test complete } val serverInfo1 = JettyUtils.startJettyServer( diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index 439e5644e2..d7e48e633e 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -69,7 +69,7 @@ object TestObject { class TestClass extends Serializable { var x = 5 - + def getX = x def run(): Int = { diff --git a/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala b/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala index e1446cbc90..32d74d0500 100644 --- a/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala @@ -32,7 +32,7 @@ class NextIteratorSuite extends FunSuite with ShouldMatchers { i.hasNext should be === false intercept[NoSuchElementException] { i.next() } } - + test("two iterations") { val i = new StubIterator(Buffer(1, 2)) i.hasNext should be === true @@ -70,7 +70,7 @@ class NextIteratorSuite extends FunSuite with ShouldMatchers { class StubIterator(ints: Buffer[Int]) extends NextIterator[Int] { var closeCalled = 0 - + override def getNext() = { if (ints.size == 0) { finished = true diff --git a/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala b/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala index 757476efdb..39199a1a17 100644 --- a/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala @@ -29,12 +29,12 @@ class XORShiftRandomSuite extends FunSuite with ShouldMatchers { val xorRand = new XORShiftRandom(seed) val hundMil = 1e8.toInt } - + /* - * This test is based on a chi-squared test for randomness. The values are hard-coded + * This test is based on a chi-squared test for randomness. The values are hard-coded * so as not to create Spark's dependency on apache.commons.math3 just to call one * method for calculating the exact p-value for a given number of random numbers - * and bins. In case one would want to move to a full-fledged test based on + * and bins. In case one would want to move to a full-fledged test based on * apache.commons.math3, the relevant class is here: * org.apache.commons.math3.stat.inference.ChiSquareTest */ @@ -49,19 +49,19 @@ class XORShiftRandomSuite extends FunSuite with ShouldMatchers { // populate bins based on modulus of the random number times(f.hundMil) {bins(math.abs(f.xorRand.nextInt) % 10) += 1} - /* since the seed is deterministic, until the algorithm is changed, we know the result will be - * exactly this: Array(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272, - * 10000790, 10002286, 9998699), so the test will never fail at the prespecified (5%) - * significance level. However, should the RNG implementation change, the test should still - * pass at the same significance level. The chi-squared test done in R gave the following + /* since the seed is deterministic, until the algorithm is changed, we know the result will be + * exactly this: Array(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272, + * 10000790, 10002286, 9998699), so the test will never fail at the prespecified (5%) + * significance level. However, should the RNG implementation change, the test should still + * pass at the same significance level. The chi-squared test done in R gave the following * results: * > chisq.test(c(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272, * 10000790, 10002286, 9998699)) * Chi-squared test for given probabilities - * data: c(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272, 10000790, + * data: c(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272, 10000790, * 10002286, 9998699) * X-squared = 11.975, df = 9, p-value = 0.2147 - * Note that the p-value was ~0.22. The test will fail if alpha < 0.05, which for 100 million + * Note that the p-value was ~0.22. The test will fail if alpha < 0.05, which for 100 million * random numbers * and 10 bins will happen at X-squared of ~16.9196. So, the test will fail if X-squared * is greater than or equal to that number. diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala index 41e813d48c..1204cfba39 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala @@ -48,41 +48,41 @@ import org.apache.spark.streaming.dstream._ * @param storageLevel RDD storage level. */ -private[streaming] +private[streaming] class MQTTInputDStream[T: ClassTag]( @transient ssc_ : StreamingContext, brokerUrl: String, topic: String, storageLevel: StorageLevel ) extends NetworkInputDStream[T](ssc_) with Logging { - + def getReceiver(): NetworkReceiver[T] = { new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]] } } -private[streaming] +private[streaming] class MQTTReceiver(brokerUrl: String, topic: String, storageLevel: StorageLevel ) extends NetworkReceiver[Any] { lazy protected val blockGenerator = new BlockGenerator(storageLevel) - + def onStop() { blockGenerator.stop() } - + def onStart() { blockGenerator.start() - // Set up persistence for messages + // Set up persistence for messages var peristance: MqttClientPersistence = new MemoryPersistence() // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance var client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance) - // Connect to MqttBroker + // Connect to MqttBroker client.connect() // Subscribe to Mqtt topic @@ -91,7 +91,7 @@ class MQTTReceiver(brokerUrl: String, // Callback automatically triggers as and when new message arrives on specified topic var callback: MqttCallback = new MqttCallback() { - // Handles Mqtt message + // Handles Mqtt message override def messageArrived(arg0: String, arg1: MqttMessage) { blockGenerator += new String(arg1.getPayload()) } diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala index 3316b6dc39..843a4a7a9a 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala @@ -31,7 +31,7 @@ import org.apache.spark.storage.StorageLevel * @constructor create a new Twitter stream using the supplied Twitter4J authentication credentials. * An optional set of string filters can be used to restrict the set of tweets. The Twitter API is * such that this may return a sampled subset of all tweets during each interval. -* +* * If no Authorization object is provided, initializes OAuth authorization using the system * properties twitter4j.oauth.consumerKey, .consumerSecret, .accessToken and .accessTokenSecret. */ @@ -42,13 +42,13 @@ class TwitterInputDStream( filters: Seq[String], storageLevel: StorageLevel ) extends NetworkInputDStream[Status](ssc_) { - + private def createOAuthAuthorization(): Authorization = { new OAuthAuthorization(new ConfigurationBuilder().build()) } private val authorization = twitterAuth.getOrElse(createOAuthAuthorization()) - + override def getReceiver(): NetworkReceiver[Status] = { new TwitterReceiver(authorization, filters, storageLevel) } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 377d9d6bd5..5635287694 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -172,7 +172,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali "EdgeDirection.Either instead.") } } - + /** * Join the vertices with an RDD and then apply a function from the * the vertex and RDD entry to a new vertex value. The input table diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala index 6386306c04..a467ca1ae7 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala @@ -55,7 +55,7 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext { } } } - + test ("filter") { withSpark { sc => val n = 5 diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala index e41d9bbe18..7f6d94571b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala @@ -30,7 +30,7 @@ import org.apache.spark.mllib.linalg.Vector trait Optimizer extends Serializable { /** - * Solve the provided convex optimization problem. + * Solve the provided convex optimization problem. */ def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala index 3bd0017aa1..d969e7aa60 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -26,7 +26,7 @@ import org.apache.spark.mllib.optimization._ import org.apache.spark.mllib.linalg.{Vectors, Vector} /** - * GeneralizedLinearModel (GLM) represents a model trained using + * GeneralizedLinearModel (GLM) represents a model trained using * GeneralizedLinearAlgorithm. GLMs consist of a weight vector and * an intercept. * @@ -38,7 +38,7 @@ abstract class GeneralizedLinearModel(val weights: Vector, val intercept: Double /** * Predict the result given a data point and the weights learned. - * + * * @param dataMatrix Row vector containing the features for this data point * @param weightMatrix Column vector containing the weights of the model * @param intercept Intercept of the model. diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala index a30dcfdcec..687e85ca94 100644 --- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala +++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala @@ -35,7 +35,7 @@ import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._ * A ClassLoader that reads classes from a Hadoop FileSystem or HTTP URI, * used to load classes defined by the interpreter when the REPL is used. * Allows the user to specify if user class path should be first - */ + */ class ExecutorClassLoader(classUri: String, parent: ClassLoader, userClassPathFirst: Boolean) extends ClassLoader { val uri = new URI(classUri) @@ -94,7 +94,7 @@ class ExecutorClassLoader(classUri: String, parent: ClassLoader, case e: Exception => None } } - + def readAndTransformClass(name: String, in: InputStream): Array[Byte] = { if (name.startsWith("line") && name.endsWith("$iw$")) { // Class seems to be an interpreter "wrapper" object storing a val or var. diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala b/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala index 8f61a5e835..419796b68b 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala @@ -187,7 +187,7 @@ trait SparkImports { if (currentImps contains imv) addWrapper() val objName = req.lineRep.readPath val valName = "$VAL" + newValId(); - + if(!code.toString.endsWith(".`" + imv + "`;\n")) { // Which means already imported code.append("val " + valName + " = " + objName + ".INSTANCE;\n") code.append("import " + valName + req.accessPath + ".`" + imv + "`;\n") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 17118499d0..1f3fab09e9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -28,7 +28,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression { override def toString = s"CAST($child, $dataType)" type EvaluatedType = Any - + def nullOrCast[T](a: Any, func: T => Any): Any = if(a == null) { null } else { @@ -40,7 +40,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression { case BinaryType => nullOrCast[Array[Byte]](_, new String(_, "UTF-8")) case _ => nullOrCast[Any](_, _.toString) } - + // BinaryConverter def castToBinary: Any => Any = child.dataType match { case StringType => nullOrCast[String](_, _.getBytes("UTF-8")) @@ -58,7 +58,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression { case DoubleType => nullOrCast[Double](_, _ != 0) case FloatType => nullOrCast[Float](_, _ != 0) } - + // TimestampConverter def castToTimestamp: Any => Any = child.dataType match { case StringType => nullOrCast[String](_, s => { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 8a1db8e796..dd9332ada8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -86,7 +86,7 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Evaluation helper function for 2 Numeric children expressions. Those expressions are supposed + * Evaluation helper function for 2 Numeric children expressions. Those expressions are supposed * to be in the same data type, and also the return type. * Either one of the expressions result is null, the evaluation result should be null. */ @@ -120,7 +120,7 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Evaluation helper function for 2 Fractional children expressions. Those expressions are + * Evaluation helper function for 2 Fractional children expressions. Those expressions are * supposed to be in the same data type, and also the return type. * Either one of the expressions result is null, the evaluation result should be null. */ @@ -153,7 +153,7 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Evaluation helper function for 2 Integral children expressions. Those expressions are + * Evaluation helper function for 2 Integral children expressions. Those expressions are * supposed to be in the same data type, and also the return type. * Either one of the expressions result is null, the evaluation result should be null. */ @@ -186,12 +186,12 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Evaluation helper function for 2 Comparable children expressions. Those expressions are + * Evaluation helper function for 2 Comparable children expressions. Those expressions are * supposed to be in the same data type, and the return type should be Integer: * Negative value: 1st argument less than 2nd argument * Zero: 1st argument equals 2nd argument * Positive value: 1st argument greater than 2nd argument - * + * * Either one of the expressions result is null, the evaluation result should be null. */ @inline @@ -213,7 +213,7 @@ abstract class Expression extends TreeNode[Expression] { null } else { e1.dataType match { - case i: NativeType => + case i: NativeType => f.asInstanceOf[(Ordering[i.JvmType], i.JvmType, i.JvmType) => Boolean]( i.ordering, evalE1.asInstanceOf[i.JvmType], evalE2.asInstanceOf[i.JvmType]) case other => sys.error(s"Type $other does not support ordered operations") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala index a27c71db1b..ddc16ce87b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala @@ -28,19 +28,19 @@ trait StringRegexExpression { self: BinaryExpression => type EvaluatedType = Any - + def escape(v: String): String def matches(regex: Pattern, str: String): Boolean - + def nullable: Boolean = true def dataType: DataType = BooleanType - - // try cache the pattern for Literal + + // try cache the pattern for Literal private lazy val cache: Pattern = right match { case x @ Literal(value: String, StringType) => compile(value) case _ => null } - + protected def compile(str: String): Pattern = if(str == null) { null } else { @@ -49,7 +49,7 @@ trait StringRegexExpression { } protected def pattern(str: String) = if(cache == null) compile(str) else cache - + override def eval(input: Row): Any = { val l = left.eval(input) if (l == null) { @@ -73,11 +73,11 @@ trait StringRegexExpression { /** * Simple RegEx pattern matching function */ -case class Like(left: Expression, right: Expression) +case class Like(left: Expression, right: Expression) extends BinaryExpression with StringRegexExpression { - + def symbol = "LIKE" - + // replace the _ with .{1} exactly match 1 time of any character // replace the % with .*, match 0 or more times with any character override def escape(v: String) = { @@ -98,19 +98,19 @@ case class Like(left: Expression, right: Expression) sb.append(Pattern.quote(Character.toString(n))); } } - + i += 1 } - + sb.toString() } - + override def matches(regex: Pattern, str: String): Boolean = regex.matcher(str).matches() } -case class RLike(left: Expression, right: Expression) +case class RLike(left: Expression, right: Expression) extends BinaryExpression with StringRegexExpression { - + def symbol = "RLIKE" override def escape(v: String): String = v override def matches(regex: Pattern, str: String): Boolean = regex.matcher(str).find(0) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index cdeb01a965..da34bd3a21 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -55,9 +55,9 @@ case object BooleanType extends NativeType { case object TimestampType extends NativeType { type JvmType = Timestamp - + @transient lazy val tag = typeTag[JvmType] - + val ordering = new Ordering[JvmType] { def compare(x: Timestamp, y: Timestamp) = x.compareTo(y) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala index 888a19d79f..2cd0d2b0e1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala @@ -144,7 +144,7 @@ class ExpressionEvaluationSuite extends FunSuite { checkEvaluation("abc" like "b%", false) checkEvaluation("abc" like "bc%", false) } - + test("LIKE Non-literal Regular Expression") { val regEx = 'a.string.at(0) checkEvaluation("abcd" like regEx, null, new GenericRow(Array[Any](null))) @@ -164,7 +164,7 @@ class ExpressionEvaluationSuite extends FunSuite { test("RLIKE literal Regular Expression") { checkEvaluation("abdef" rlike "abdef", true) checkEvaluation("abbbbc" rlike "a.*c", true) - + checkEvaluation("fofo" rlike "^fo", true) checkEvaluation("fo\no" rlike "^fo\no$", true) checkEvaluation("Bn" rlike "^Ba*n", true) @@ -196,9 +196,9 @@ class ExpressionEvaluationSuite extends FunSuite { evaluate("abbbbc" rlike regEx, new GenericRow(Array[Any]("**"))) } } - + test("data type casting") { - + val sts = "1970-01-01 00:00:01.0" val ts = Timestamp.valueOf(sts) @@ -236,7 +236,7 @@ class ExpressionEvaluationSuite extends FunSuite { checkEvaluation("23" cast ShortType, 23) checkEvaluation("2012-12-11" cast DoubleType, null) checkEvaluation(Literal(123) cast IntegerType, 123) - + intercept[Exception] {evaluate(Literal(1) cast BinaryType, null)} } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala index 65eae3357a..1cbf973c34 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala @@ -56,4 +56,4 @@ class ScalaReflectionRelationSuite extends FunSuite { val result = sql("SELECT data FROM reflectBinary").collect().head(0).asInstanceOf[Array[Byte]] assert(result.toSeq === Seq[Byte](1)) } -} \ No newline at end of file +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 93023e8dce..ac56ff709c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -59,7 +59,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) } } -private[streaming] +private[streaming] object Checkpoint extends Logging { val PREFIX = "checkpoint-" val REGEX = (PREFIX + """([\d]+)([\w\.]*)""").r @@ -79,7 +79,7 @@ object Checkpoint extends Logging { def sortFunc(path1: Path, path2: Path): Boolean = { val (time1, bk1) = path1.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) } val (time2, bk2) = path2.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) } - (time1 < time2) || (time1 == time2 && bk1) + (time1 < time2) || (time1 == time2 && bk1) } val path = new Path(checkpointDir) @@ -95,7 +95,7 @@ object Checkpoint extends Logging { } } else { logInfo("Checkpoint directory " + path + " does not exist") - Seq.empty + Seq.empty } } } @@ -160,7 +160,7 @@ class CheckpointWriter( }) } - // All done, print success + // All done, print success val finishTime = System.currentTimeMillis() logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + checkpointFile + "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms") @@ -227,14 +227,14 @@ object CheckpointReader extends Logging { { val checkpointPath = new Path(checkpointDir) def fs = checkpointPath.getFileSystem(hadoopConf) - - // Try to find the checkpoint files + + // Try to find the checkpoint files val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs).reverse if (checkpointFiles.isEmpty) { return None } - // Try to read the checkpoint files in the order + // Try to read the checkpoint files in the order logInfo("Checkpoint files found: " + checkpointFiles.mkString(",")) val compressionCodec = CompressionCodec.createCodec(conf) checkpointFiles.foreach(file => { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala index 16479a0127..ad4f3fdd14 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala @@ -20,11 +20,11 @@ package org.apache.spark.streaming private[streaming] class Interval(val beginTime: Time, val endTime: Time) { def this(beginMs: Long, endMs: Long) = this(new Time(beginMs), new Time(endMs)) - + def duration(): Duration = endTime - beginTime def + (time: Duration): Interval = { - new Interval(beginTime + time, endTime + time) + new Interval(beginTime + time, endTime + time) } def - (time: Duration): Interval = { @@ -40,9 +40,9 @@ class Interval(val beginTime: Time, val endTime: Time) { } def <= (that: Interval) = (this < that || this == that) - + def > (that: Interval) = !(this <= that) - + def >= (that: Interval) = !(this < that) override def toString = "[" + beginTime + ", " + endTime + "]" diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala index 2678334f53..6a6b00a778 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala @@ -32,7 +32,7 @@ case class Time(private val millis: Long) { def <= (that: Time): Boolean = (this.millis <= that.millis) def > (that: Time): Boolean = (this.millis > that.millis) - + def >= (that: Time): Boolean = (this.millis >= that.millis) def + (that: Duration): Time = new Time(millis + that.milliseconds) @@ -43,7 +43,7 @@ case class Time(private val millis: Long) { def floor(that: Duration): Time = { val t = that.milliseconds - val m = math.floor(this.millis / t).toLong + val m = math.floor(this.millis / t).toLong new Time(m * t) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala index 903e3f3c9b..f33c0ceafd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -51,7 +51,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) .map(x => (x._1, x._2.getCheckpointFile.get)) logDebug("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n")) - // Add the checkpoint files to the data to be serialized + // Add the checkpoint files to the data to be serialized if (!checkpointFiles.isEmpty) { currentCheckpointFiles.clear() currentCheckpointFiles ++= checkpointFiles diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 8a6051622e..e878285f6a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -232,7 +232,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas } logDebug("Accepted " + path) } catch { - case fnfe: java.io.FileNotFoundException => + case fnfe: java.io.FileNotFoundException => logWarning("Error finding new files", fnfe) reset() return false diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala index 97325f8ea3..6376cff78b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala @@ -31,11 +31,11 @@ class QueueInputDStream[T: ClassTag]( oneAtATime: Boolean, defaultRDD: RDD[T] ) extends InputDStream[T](ssc) { - + override def start() { } - + override def stop() { } - + override def compute(validTime: Time): Option[RDD[T]] = { val buffer = new ArrayBuffer[RDD[T]]() if (oneAtATime && queue.size > 0) { @@ -55,5 +55,5 @@ class QueueInputDStream[T: ClassTag]( None } } - + } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala index 44eb2750c6..f5984d03c5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala @@ -47,7 +47,7 @@ object ReceiverSupervisorStrategy { * the API for pushing received data into Spark Streaming for being processed. * * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html - * + * * @example {{{ * class MyActor extends Actor with Receiver{ * def receive { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala index c5ef2cc8c3..39145a3ab0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala @@ -19,34 +19,34 @@ package org.apache.spark.streaming.util private[streaming] trait Clock { - def currentTime(): Long + def currentTime(): Long def waitTillTime(targetTime: Long): Long } private[streaming] class SystemClock() extends Clock { - + val minPollTime = 25L - + def currentTime(): Long = { System.currentTimeMillis() - } - + } + def waitTillTime(targetTime: Long): Long = { var currentTime = 0L currentTime = System.currentTimeMillis() - + var waitTime = targetTime - currentTime if (waitTime <= 0) { return currentTime } - + val pollTime = { if (waitTime / 10.0 > minPollTime) { (waitTime / 10.0).toLong } else { - minPollTime - } + minPollTime + } } while (true) { @@ -55,7 +55,7 @@ class SystemClock() extends Clock { if (waitTime <= 0) { return currentTime } - val sleepTime = + val sleepTime = if (waitTime < pollTime) { waitTime } else { @@ -69,7 +69,7 @@ class SystemClock() extends Clock { private[streaming] class ManualClock() extends Clock { - + var time = 0L def currentTime() = time @@ -85,13 +85,13 @@ class ManualClock() extends Clock { this.synchronized { time += timeToAdd this.notifyAll() - } + } } def waitTillTime(targetTime: Long): Long = { this.synchronized { while (time < targetTime) { this.wait(100) - } + } } currentTime() } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala index 07021ebb58..bd1df55cf7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala @@ -25,8 +25,8 @@ import scala.collection.JavaConversions.mapAsScalaMap private[streaming] object RawTextHelper { - /** - * Splits lines and counts the words in them using specialized object-to-long hashmap + /** + * Splits lines and counts the words in them using specialized object-to-long hashmap * (to avoid boxing-unboxing overhead of Long in java/scala HashMap) */ def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = { @@ -55,13 +55,13 @@ object RawTextHelper { map.toIterator.map{case (k, v) => (k, v)} } - /** + /** * Gets the top k words in terms of word counts. Assumes that each word exists only once * in the `data` iterator (that is, the counts have been reduced). */ def topK(data: Iterator[(String, Long)], k: Int): Iterator[(String, Long)] = { val taken = new Array[(String, Long)](k) - + var i = 0 var len = 0 var done = false @@ -93,7 +93,7 @@ object RawTextHelper { } taken.toIterator } - + /** * Warms up the SparkContext in master and slave by running tasks to force JIT kick in * before real workload starts. @@ -106,11 +106,11 @@ object RawTextHelper { .count() } } - - def add(v1: Long, v2: Long) = (v1 + v2) - def subtract(v1: Long, v2: Long) = (v1 - v2) + def add(v1: Long, v2: Long) = (v1 + v2) + + def subtract(v1: Long, v2: Long) = (v1 - v2) - def max(v1: Long, v2: Long) = math.max(v1, v2) + def max(v1: Long, v2: Long) = math.max(v1, v2) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala index f71938ac55..e016377c94 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala @@ -22,10 +22,10 @@ import org.apache.spark.Logging private[streaming] class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String) extends Logging { - + private val thread = new Thread("RecurringTimer - " + name) { setDaemon(true) - override def run() { loop } + override def run() { loop } } @volatile private var prevTime = -1L @@ -104,11 +104,11 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: private[streaming] object RecurringTimer { - + def main(args: Array[String]) { var lastRecurTime = 0L val period = 1000 - + def onRecur(time: Long) { val currentTime = System.currentTimeMillis() println("" + currentTime + ": " + (currentTime - lastRecurTime)) diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 13fa64894b..a0b1bbc34f 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -1673,7 +1673,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @Test public void testSocketString() { - + class Converter implements Function<InputStream, Iterable<String>> { public Iterable<String> call(InputStream in) throws IOException { BufferedReader reader = new BufferedReader(new InputStreamReader(in)); -- GitLab