diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index 668032a3a2680aac0085cb6e17c1bb01653ca4e5..0aa8852649e058adf514224fd07ba4b78b7c92e8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -1,19 +1,19 @@ /* * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala index 481ff8c3e02d256c6a9fe1a3580eea52fe687f22..b1e1576dadc1a88c9b6de8949b0d41255c2a4055 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala @@ -76,7 +76,7 @@ private[spark] object ShuffleCopier extends Logging { extends FileClientHandler with Logging { override def handle(ctx: ChannelHandlerContext, in: ByteBuf, header: FileHeader) { - logDebug("Received Block: " + header.blockId + " (" + header.fileLen + "B)"); + logDebug("Received Block: " + header.blockId + " (" + header.fileLen + "B)") resultCollectCallBack(header.blockId, header.fileLen.toLong, in.readBytes(header.fileLen)) } diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index 9b0c882481cfc8357e3758dc89c7197ae2dd648c..0de22f0e06e494f75c6a632a90389eeaeabdf17c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -70,7 +70,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest]( override def compute(split: Partition, context: TaskContext) = { val currSplit = split.asInstanceOf[CartesianPartition] for (x <- rdd1.iterator(currSplit.s1, context); - y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) + y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) } override def getDependencies: Seq[Dependency[_]] = List( diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index c1c7aa70e6c92385766eff5b0bb864de4a7a66ef..fbd822867fa0defa9af8a79f0cc06eae8379e7d3 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -133,7 +133,7 @@ private[spark] class StagePage(parent: JobProgressUI) { summary ++ <h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++ <div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++ - <h4>Tasks</h4> ++ taskTable; + <h4>Tasks</h4> ++ taskTable headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages) } diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala index 459e257d79a36a4a12d3e5c1f0de6490e09106ca..8dd5786da6ff5c93f3bee3b3287baf89c7551220 100644 --- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala +++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala @@ -30,7 +30,7 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self @transient var sc: SparkContext = _ override def beforeAll() { - InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory()); + InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory()) super.beforeAll() } diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index 7d938917f2650754f93ed019ace4d02a4cf6d349..1374d01774693bfb1c71122a1dfb33c2b9b1a098 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -142,11 +142,11 @@ class PartitioningSuite extends FunSuite with SharedSparkContext { .filter(_ >= 0.0) // Run the partitions, including the consecutive empty ones, through StatCounter - val stats: StatCounter = rdd.stats(); - assert(abs(6.0 - stats.sum) < 0.01); - assert(abs(6.0/2 - rdd.mean) < 0.01); - assert(abs(1.0 - rdd.variance) < 0.01); - assert(abs(1.0 - rdd.stdev) < 0.01); + val stats: StatCounter = rdd.stats() + assert(abs(6.0 - stats.sum) < 0.01) + assert(abs(6.0/2 - rdd.mean) < 0.01) + assert(abs(1.0 - rdd.variance) < 0.01) + assert(abs(1.0 - rdd.stdev) < 0.01) // Add other tests here for classes that should be able to handle empty partitions correctly } diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala index 4af45b2b4a0670df6d8c5072c818cdf0be72fe1b..83db8b9e26411cb23f08892be5f59a5664a23813 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala @@ -120,7 +120,7 @@ object LocalALS { System.exit(1) } } - printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS); + printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS) val R = generateR() diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala index 5a7a9d1bd8f744bcc92c5e8f0a30c50051fd64bc..8543ce0e3285e3fe8f45121045012bba51712b54 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala @@ -65,7 +65,7 @@ object SparkTC { oldCount = nextCount // Perform the join, obtaining an RDD of (y, (z, x)) pairs, // then project the result to obtain the new (x, z) paths. - tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct().cache(); + tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct().cache() nextCount = tc.count() } while (nextCount != oldCount) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala index cd3423a07b6b788fb3b7bc6f2c9e8c22ffdc415c..af52b7e9a12f13c747a70b030d9719ecf3745595 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala @@ -120,7 +120,7 @@ object FeederActor { println("Feeder started as:" + feeder) - actorSystem.awaitTermination(); + actorSystem.awaitTermination() } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala index af698a01d511871472751f1fdb779e808cb4d6ab..ff332a02821290f98faebcbf6a06ec6c091bc31b 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -54,12 +54,12 @@ object MQTTPublisher { client.connect() - val msgtopic: MqttTopic = client.getTopic(topic); + val msgtopic: MqttTopic = client.getTopic(topic) val msg: String = "hello mqtt demo for spark streaming" while (true) { val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes()) - msgtopic.publish(message); + msgtopic.publish(message) println("Published data. topic: " + msgtopic.getName() + " Message: " + message) } client.disconnect() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index bb9febad38d03abaea536b413a5f1405c19052af..9271914eb536a6565210913758d12db91e3a1306 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -94,7 +94,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging { fs.delete(file, false) fs.rename(writeFile, file) - val finishTime = System.currentTimeMillis(); + val finishTime = System.currentTimeMillis() logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file + "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds") return @@ -124,7 +124,9 @@ class CheckpointWriter(checkpointDir: String) extends Logging { def stop() { synchronized { - if (stopped) return ; + if (stopped) { + return + } stopped = true } executor.shutdown() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index cf30b541e1f92f020952295530aab24b91a2cad5..7f9dab0ef9982fd6e7872ced27635898d8544b69 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.api.java -import java.lang.{Long => JLong, Integer => JInt} +import java.lang.{Integer => JInt} import java.io.InputStream import java.util.{Map => JMap, List => JList} @@ -33,10 +33,9 @@ import twitter4j.auth.Authorization import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} -import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaSparkContext, JavaRDD} +import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD} import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ -import org.apache.spark.streaming.receivers.{ActorReceiver, ReceiverSupervisorStrategy} /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -311,7 +310,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] implicit val cmf: ClassManifest[F] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[F]] - ssc.fileStream[K, V, F](directory); + ssc.fileStream[K, V, F](directory) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala index 18de772946805607fca9a421845e87af23c0683b..a0189eca043c674b1a1a7ce681a210020e0ea383 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala @@ -137,8 +137,8 @@ class FlumeReceiver( protected override def onStart() { val responder = new SpecificResponder( - classOf[AvroSourceProtocol], new FlumeEventServer(this)); - val server = new NettyServer(responder, new InetSocketAddress(host, port)); + classOf[AvroSourceProtocol], new FlumeEventServer(this)) + val server = new NettyServer(responder, new InetSocketAddress(host, port)) blockGenerator.start() server.start() logInfo("Flume receiver started") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index a559db468a771826e818e0aaccc0bf5b5e9b5913..7dc82decefd6e0ac3cb69acf51d95840b2aeef55 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -124,9 +124,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val input = Seq(1, 2, 3, 4, 5) Thread.sleep(1000) - val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)); + val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)) val client = SpecificRequestor.getClient( - classOf[AvroSourceProtocol], transceiver); + classOf[AvroSourceProtocol], transceiver) for (i <- 0 until input.size) { val event = new AvroFlumeEvent diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index be140699c2964456a747a076120987c9abc26f39..8c8c359e6e8655bbaea25554f73515337240c552 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -251,7 +251,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { Thread.sleep(500) // Give some time for the forgetting old RDDs to complete } catch { - case e: Exception => e.printStackTrace(); throw e; + case e: Exception => {e.printStackTrace(); throw e} } finally { ssc.stop() } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 89b00415daa8ee43c5a17a2cd8cb30a513094278..a7baf0c36cfd4db3ab2831c073b0af0f830a834e 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -21,6 +21,7 @@ import java.io.IOException import java.net.Socket import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.net.NetUtils @@ -33,6 +34,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark.{SparkContext, Logging} import org.apache.spark.util.Utils + import scala.collection.JavaConversions._ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging { @@ -65,7 +67,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30) appAttemptId = getApplicationAttemptId() - isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts; + isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts resourceManager = registerWithResourceManager() // Workaround until hadoop moves to something which has @@ -195,7 +197,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e successed = true } finally { logDebug("finishing main") - isLastAMRetry = true; + isLastAMRetry = true if (successed) { ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) } else { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1078d5b826f67684dfbf06c6aa98f6288335a9c6..94e353af2e954f5e56970375b56e4dc60f1eeb5d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -17,14 +17,13 @@ package org.apache.spark.deploy.yarn -import java.net.{InetAddress, InetSocketAddress, UnknownHostException, URI} +import java.net.{InetAddress, UnknownHostException, URI} import java.nio.ByteBuffer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil} import org.apache.hadoop.fs.permission.FsPermission import org.apache.hadoop.mapred.Master -import org.apache.hadoop.net.NetUtils import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api._ @@ -40,9 +39,7 @@ import scala.collection.mutable.HashMap import scala.collection.mutable.Map import scala.collection.JavaConversions._ -import org.apache.spark.Logging -import org.apache.spark.util.Utils -import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.Logging class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging { @@ -123,7 +120,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl // if we have requested more then the clusters max for a single resource then exit. if (args.workerMemory > maxMem) { - logError("the worker size is to large to run on this cluster " + args.workerMemory); + logError("the worker size is to large to run on this cluster " + args.workerMemory) System.exit(1) } val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD @@ -160,8 +157,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl var dstHost = dstUri.getHost() if ((srcHost != null) && (dstHost != null)) { try { - srcHost = InetAddress.getByName(srcHost).getCanonicalHostName(); - dstHost = InetAddress.getByName(dstHost).getCanonicalHostName(); + srcHost = InetAddress.getByName(srcHost).getCanonicalHostName() + dstHost = InetAddress.getByName(dstHost).getCanonicalHostName() } catch { case e: UnknownHostException => return false @@ -178,7 +175,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl if (srcUri.getPort() != dstUri.getPort()) { return false } - return true; + return true } /** @@ -190,13 +187,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl replication: Short, setPerms: Boolean = false): Path = { val fs = FileSystem.get(conf) - val remoteFs = originalPath.getFileSystem(conf); + val remoteFs = originalPath.getFileSystem(conf) var newPath = originalPath if (! compareFs(remoteFs, fs)) { newPath = new Path(dstDir, originalPath.getName()) logInfo("Uploading " + originalPath + " to " + newPath) - FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf); - fs.setReplication(newPath, replication); + FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf) + fs.setReplication(newPath, replication) if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION)) } // resolve any symlinks in the URI path so using a "current" symlink @@ -214,7 +211,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl // Add them as local resources to the AM val fs = FileSystem.get(conf) - val delegTokenRenewer = Master.getMasterPrincipal(conf); + val delegTokenRenewer = Master.getMasterPrincipal(conf) if (UserGroupInformation.isSecurityEnabled()) { if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { logError("Can't get Master Kerberos principal for use as renewer") @@ -226,7 +223,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl if (UserGroupInformation.isSecurityEnabled()) { val dstFs = dst.getFileSystem(conf) - dstFs.addDelegationTokens(delegTokenRenewer, credentials); + dstFs.addDelegationTokens(delegTokenRenewer, credentials) } val localResources = HashMap[String, LocalResource]() FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION)) @@ -286,7 +283,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } } - UserGroupInformation.getCurrentUser().addCredentials(credentials); + UserGroupInformation.getCurrentUser().addCredentials(credentials) return localResources } @@ -366,7 +363,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } // Command for the ApplicationMaster - var javaCommand = "java"; + var javaCommand = "java" val javaHome = System.getenv("JAVA_HOME") if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) { javaCommand = Environment.JAVA_HOME.$() + "/bin/java" diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala index 674c8f8112b867e22a09cd94ace0f2fb6d8a62e5..5f159b073f5372dc9e1f73347f88ae8aa48e0cbb 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala @@ -197,11 +197,11 @@ class ClientDistributedCacheManager() extends Logging { */ def checkPermissionOfOther(fs: FileSystem, path: Path, action: FsAction, statCache: Map[URI, FileStatus]): Boolean = { - val status = getFileStatus(fs, path.toUri(), statCache); + val status = getFileStatus(fs, path.toUri(), statCache) val perms = status.getPermission() val otherAction = perms.getOtherAction() if (otherAction.implies(action)) { - return true; + return true } return false } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala index 7a66532254c74f2393095da68f123727ce5e22f7..a4d6e1d87d1270fd48e8f0f6273f8cfd076e7e64 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala @@ -22,7 +22,7 @@ import java.nio.ByteBuffer import java.security.PrivilegedExceptionAction import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.fs.Path import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.net.NetUtils import org.apache.hadoop.security.UserGroupInformation @@ -38,7 +38,6 @@ import scala.collection.JavaConversions._ import scala.collection.mutable.HashMap import org.apache.spark.Logging -import org.apache.spark.util.Utils class WorkerRunnable(container: Container, conf: Configuration, masterAddress: String, slaveId: String, hostname: String, workerMemory: Int, workerCores: Int) @@ -108,7 +107,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S credentials.writeTokenStorageToStream(dob) ctx.setContainerTokens(ByteBuffer.wrap(dob.getData())) - var javaCommand = "java"; + var javaCommand = "java" val javaHome = System.getenv("JAVA_HOME") if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) { javaCommand = Environment.JAVA_HOME.$() + "/bin/java" @@ -204,8 +203,8 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S // use doAs and remoteUser here so we can add the container token and not // pollute the current users credentials with all of the individual container tokens - val user = UserGroupInformation.createRemoteUser(container.getId().toString()); - val containerToken = container.getContainerToken(); + val user = UserGroupInformation.createRemoteUser(container.getId().toString()) + val containerToken = container.getContainerToken() if (containerToken != null) { user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress)) } @@ -216,8 +215,8 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S return rpc.getProxy(classOf[ContainerManager], cmAddress, conf).asInstanceOf[ContainerManager] } - }); - return proxy; + }) + proxy } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index ca2f1e2565b9a8a7956ffe00e2b453779956853a..2ba2366ead17113c764663ed934a5221f0eeb0ee 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -18,13 +18,10 @@ package org.apache.spark.deploy.yarn import org.apache.spark.deploy.SparkHadoopUtil -import collection.mutable.HashMap import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment -import java.security.PrivilegedExceptionAction /** * Contains util methods to interact with Hadoop from spark. @@ -40,7 +37,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster override def addCredentials(conf: JobConf) { - val jobCreds = conf.getCredentials(); + val jobCreds = conf.getCredentials() jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) } }