diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala index 3b9c885bf97a7a72cbda408a13890238d101fbd7..261265f0b4c550b3508e4b8990ec10dc4ed12c66 100644 --- a/core/src/main/scala/org/apache/spark/SSLOptions.scala +++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala @@ -39,8 +39,11 @@ import org.eclipse.jetty.util.ssl.SslContextFactory * @param keyStore a path to the key-store file * @param keyStorePassword a password to access the key-store file * @param keyPassword a password to access the private key in the key-store + * @param keyStoreType the type of the key-store + * @param needClientAuth set true if SSL needs client authentication * @param trustStore a path to the trust-store file * @param trustStorePassword a password to access the trust-store file + * @param trustStoreType the type of the trust-store * @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java * @param enabledAlgorithms a set of encryption algorithms that may be used */ @@ -49,8 +52,11 @@ private[spark] case class SSLOptions( keyStore: Option[File] = None, keyStorePassword: Option[String] = None, keyPassword: Option[String] = None, + keyStoreType: Option[String] = None, + needClientAuth: Boolean = false, trustStore: Option[File] = None, trustStorePassword: Option[String] = None, + trustStoreType: Option[String] = None, protocol: Option[String] = None, enabledAlgorithms: Set[String] = Set.empty) extends Logging { @@ -63,12 +69,18 @@ private[spark] case class SSLOptions( val sslContextFactory = new SslContextFactory() keyStore.foreach(file => sslContextFactory.setKeyStorePath(file.getAbsolutePath)) - trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath)) keyStorePassword.foreach(sslContextFactory.setKeyStorePassword) - trustStorePassword.foreach(sslContextFactory.setTrustStorePassword) keyPassword.foreach(sslContextFactory.setKeyManagerPassword) + keyStoreType.foreach(sslContextFactory.setKeyStoreType) + if (needClientAuth) { + trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath)) + trustStorePassword.foreach(sslContextFactory.setTrustStorePassword) + trustStoreType.foreach(sslContextFactory.setTrustStoreType) + } protocol.foreach(sslContextFactory.setProtocol) - sslContextFactory.setIncludeCipherSuites(supportedAlgorithms.toSeq: _*) + if (supportedAlgorithms.nonEmpty) { + sslContextFactory.setIncludeCipherSuites(supportedAlgorithms.toSeq: _*) + } Some(sslContextFactory) } else { @@ -82,6 +94,13 @@ private[spark] case class SSLOptions( */ def createAkkaConfig: Option[Config] = { if (enabled) { + if (keyStoreType.isDefined) { + logWarning("Akka configuration does not support key store type."); + } + if (trustStoreType.isDefined) { + logWarning("Akka configuration does not support trust store type."); + } + Some(ConfigFactory.empty() .withValue("akka.remote.netty.tcp.security.key-store", ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse(""))) @@ -110,7 +129,9 @@ private[spark] case class SSLOptions( * The supportedAlgorithms set is a subset of the enabledAlgorithms that * are supported by the current Java security provider for this protocol. */ - private val supportedAlgorithms: Set[String] = { + private val supportedAlgorithms: Set[String] = if (enabledAlgorithms.isEmpty) { + Set() + } else { var context: SSLContext = null try { context = SSLContext.getInstance(protocol.orNull) @@ -133,7 +154,11 @@ private[spark] case class SSLOptions( logDebug(s"Discarding unsupported cipher $cipher") } - enabledAlgorithms & providerAlgorithms + val supported = enabledAlgorithms & providerAlgorithms + require(supported.nonEmpty || sys.env.contains("SPARK_TESTING"), + "SSLContext does not support any of the enabled algorithms: " + + enabledAlgorithms.mkString(",")) + supported } /** Returns a string representation of this SSLOptions with all the passwords masked. */ @@ -153,9 +178,12 @@ private[spark] object SSLOptions extends Logging { * $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory * $ - `[ns].keyStorePassword` - a password to the key-store file * $ - `[ns].keyPassword` - a password to the private key + * $ - `[ns].keyStoreType` - the type of the key-store + * $ - `[ns].needClientAuth` - whether SSL needs client authentication * $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current * directory * $ - `[ns].trustStorePassword` - a password to the trust-store file + * $ - `[ns].trustStoreType` - the type of trust-store * $ - `[ns].protocol` - a protocol name supported by a particular Java version * $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers * @@ -183,12 +211,21 @@ private[spark] object SSLOptions extends Logging { val keyPassword = conf.getOption(s"$ns.keyPassword") .orElse(defaults.flatMap(_.keyPassword)) + val keyStoreType = conf.getOption(s"$ns.keyStoreType") + .orElse(defaults.flatMap(_.keyStoreType)) + + val needClientAuth = + conf.getBoolean(s"$ns.needClientAuth", defaultValue = defaults.exists(_.needClientAuth)) + val trustStore = conf.getOption(s"$ns.trustStore").map(new File(_)) .orElse(defaults.flatMap(_.trustStore)) val trustStorePassword = conf.getOption(s"$ns.trustStorePassword") .orElse(defaults.flatMap(_.trustStorePassword)) + val trustStoreType = conf.getOption(s"$ns.trustStoreType") + .orElse(defaults.flatMap(_.trustStoreType)) + val protocol = conf.getOption(s"$ns.protocol") .orElse(defaults.flatMap(_.protocol)) @@ -202,8 +239,11 @@ private[spark] object SSLOptions extends Logging { keyStore, keyStorePassword, keyPassword, + keyStoreType, + needClientAuth, trustStore, trustStorePassword, + trustStoreType, protocol, enabledAlgorithms) } diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 64e483e3847724a67ca6770a55c1083e806ac296..c5aec05c03fced6c8f7ad6e54259d75e39419837 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -244,14 +244,8 @@ private[spark] class SecurityManager(sparkConf: SparkConf) // the default SSL configuration - it will be used by all communication layers unless overwritten private val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None) - // SSL configuration for different communication layers - they can override the default - // configuration at a specified namespace. The namespace *must* start with spark.ssl. - val fileServerSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.fs", Some(defaultSSLOptions)) - val akkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.akka", Some(defaultSSLOptions)) - - logDebug(s"SSLConfiguration for file server: $fileServerSSLOptions") - logDebug(s"SSLConfiguration for Akka: $akkaSSLOptions") - + // SSL configuration for the file server. This is used by Utils.setupSecureURLConnection(). + val fileServerSSLOptions = getSSLOptions("fs") val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) { val trustStoreManagers = for (trustStore <- fileServerSSLOptions.trustStore) yield { @@ -292,6 +286,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf) (None, None) } + def getSSLOptions(module: String): SSLOptions = { + val opts = SSLOptions.parse(sparkConf, s"spark.ssl.$module", Some(defaultSSLOptions)) + logDebug(s"Created SSL options for $module: $opts") + opts + } + /** * Split a comma separated String, filter out any empty items, and return a Set of strings */ diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 3feb7cea593e04e486a0c752b7ba8faeddfcdf5d..3e78c7ae240f385e4e9b0ee655b0412b14a03bc7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -41,8 +41,7 @@ private[deploy] object DeployMessages { worker: RpcEndpointRef, cores: Int, memory: Int, - webUiPort: Int, - publicAddress: String) + workerWebUiUrl: String) extends DeployMessage { Utils.checkHost(host, "Required hostname") assert (port > 0) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 96007a06e3c54bf96f25a3c6ddbdfc3e2df057c3..1f13d7db348ecdacc99ff684d745b7d50d523cf8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -49,7 +49,8 @@ class HistoryServer( provider: ApplicationHistoryProvider, securityManager: SecurityManager, port: Int) - extends WebUI(securityManager, port, conf) with Logging with UIRoot { + extends WebUI(securityManager, securityManager.getSSLOptions("historyServer"), port, conf) + with Logging with UIRoot { // How many applications to retain private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50) @@ -233,7 +234,7 @@ object HistoryServer extends Logging { val UI_PATH_PREFIX = "/history" - def main(argStrings: Array[String]) { + def main(argStrings: Array[String]): Unit = { Utils.initDaemon(log) new HistoryServerArguments(conf, argStrings) initSecurity() diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 0deab8ddd5270b0b0e2676e4aa78bdf28aa85954..202a1b787c21ba80b8a9ef259232b4d4c2106610 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -383,7 +383,7 @@ private[deploy] class Master( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RegisterWorker( - id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => { + id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory))) if (state == RecoveryState.STANDBY) { @@ -392,7 +392,7 @@ private[deploy] class Master( context.reply(RegisterWorkerFailed("Duplicate worker ID")) } else { val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, - workerRef, workerUiPort, publicAddress) + workerRef, workerWebUiUrl) if (registerWorker(worker)) { persistenceEngine.addWorker(worker) context.reply(RegisteredWorker(self, masterWebUiUrl)) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index f7519666052060064be9391d6f75a0e12026e998..4e20c10fd14273f8f50740cd2140d4d5717d301b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -29,8 +29,7 @@ private[spark] class WorkerInfo( val cores: Int, val memory: Int, val endpoint: RpcEndpointRef, - val webUiPort: Int, - val publicAddress: String) + val webUiAddress: String) extends Serializable { Utils.checkHost(host, "Expected hostname") @@ -98,10 +97,6 @@ private[spark] class WorkerInfo( coresUsed -= driver.desc.cores } - def webUiAddress : String = { - "http://" + this.publicAddress + ":" + this.webUiPort - } - def setState(state: WorkerState.Value): Unit = { this.state = state } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index 750ef0a9625502e4f7382b628849813ca4f0c741..d7543926f38509ec36161ea59277a73f68343a78 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -32,8 +32,8 @@ class MasterWebUI( val master: Master, requestedPort: Int, customMasterPage: Option[MasterPage] = None) - extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging - with UIRoot { + extends WebUI(master.securityMgr, master.securityMgr.getSSLOptions("standalone"), + requestedPort, master.conf, name = "MasterUI") with Logging with UIRoot { val masterEndpointRef = master.self val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true) diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala index da9740bb41f59a4f6c71de8db1cddbfc6fa8d69f..baad098a0cd1fedce8b1790ddac7374e34eb6d08 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala @@ -31,7 +31,7 @@ private[spark] class MesosClusterUI( conf: SparkConf, dispatcherPublicAddress: String, val scheduler: MesosClusterScheduler) - extends WebUI(securityManager, port, conf) { + extends WebUI(securityManager, securityManager.getSSLOptions("mesos"), port, conf) { initialize() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 98e17da4897410807ac9a275b2bf716a113d9964..179d3b9f20b1f7cc0a68680981f38b1ef6bf7048 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -100,6 +100,7 @@ private[deploy] class Worker( private var master: Option[RpcEndpointRef] = None private var activeMasterUrl: String = "" private[worker] var activeMasterWebUiUrl : String = "" + private var workerWebUiUrl: String = "" private val workerUri = RpcEndpointAddress(rpcEnv.address, endpointName).toString private var registered = false private var connected = false @@ -184,6 +185,9 @@ private[deploy] class Worker( shuffleService.startIfEnabled() webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() + + val scheme = if (webUi.sslOptions.enabled) "https" else "http" + workerWebUiUrl = s"$scheme://$publicAddress:${webUi.boundPort}" registerWithMaster() metricsSystem.registerSource(workerSource) @@ -336,7 +340,7 @@ private[deploy] class Worker( private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = { masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker( - workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress)) + workerId, host, port, self, cores, memory, workerWebUiUrl)) .onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index 1a0598e50dcf103897eee6a9b16b47aac8698771..b45b6824949e0f6f867e01579df193bf676c49f0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -34,7 +34,8 @@ class WorkerWebUI( val worker: Worker, val workDir: File, requestedPort: Int) - extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI") + extends WebUI(worker.securityMgr, worker.securityMgr.getSSLOptions("standalone"), + requestedPort, worker.conf, name = "WorkerUI") with Logging { private[ui] val timeout = RpcUtils.askRpcTimeout(worker.conf) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index b796a44fe01acb608a6e869e127bccb2f39a2e40..bc143b7de399cc1e3e72199700c51ec2025b4e43 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -17,21 +17,24 @@ package org.apache.spark.ui -import java.net.{InetSocketAddress, URL} +import java.net.{URI, URL} import javax.servlet.DispatcherType import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} +import scala.collection.mutable.{ArrayBuffer, StringBuilder} import scala.language.implicitConversions import scala.xml.Node -import org.eclipse.jetty.server.Server +import org.eclipse.jetty.server.{Connector, Request, Server} import org.eclipse.jetty.server.handler._ +import org.eclipse.jetty.server.nio.SelectChannelConnector +import org.eclipse.jetty.server.ssl.SslSelectChannelConnector import org.eclipse.jetty.servlet._ import org.eclipse.jetty.util.thread.QueuedThreadPool import org.json4s.JValue import org.json4s.jackson.JsonMethods.{pretty, render} -import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SSLOptions} import org.apache.spark.util.Utils /** @@ -224,23 +227,51 @@ private[spark] object JettyUtils extends Logging { def startJettyServer( hostName: String, port: Int, + sslOptions: SSLOptions, handlers: Seq[ServletContextHandler], conf: SparkConf, serverName: String = ""): ServerInfo = { + val collection = new ContextHandlerCollection addFilters(handlers, conf) - val collection = new ContextHandlerCollection val gzipHandlers = handlers.map { h => val gzipHandler = new GzipHandler gzipHandler.setHandler(h) gzipHandler } - collection.setHandlers(gzipHandlers.toArray) // Bind to the given port, or throw a java.net.BindException if the port is occupied def connect(currentPort: Int): (Server, Int) = { - val server = new Server(new InetSocketAddress(hostName, currentPort)) + val server = new Server + val connectors = new ArrayBuffer[Connector] + // Create a connector on port currentPort to listen for HTTP requests + val httpConnector = new SelectChannelConnector() + httpConnector.setPort(currentPort) + connectors += httpConnector + + sslOptions.createJettySslContextFactory().foreach { factory => + // If the new port wraps around, do not try a privileged port. + val securePort = + if (currentPort != 0) { + (currentPort + 400 - 1024) % (65536 - 1024) + 1024 + } else { + 0 + } + val scheme = "https" + // Create a connector on port securePort to listen for HTTPS requests + val connector = new SslSelectChannelConnector(factory) + connector.setPort(securePort) + connectors += connector + + // redirect the HTTP requests to HTTPS port + collection.addHandler(createRedirectHttpsHandler(securePort, scheme)) + } + + gzipHandlers.foreach(collection.addHandler) + connectors.foreach(_.setHost(hostName)) + server.setConnectors(connectors.toArray) + val pool = new QueuedThreadPool pool.setDaemon(true) server.setThreadPool(pool) @@ -262,6 +293,42 @@ private[spark] object JettyUtils extends Logging { val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, conf, serverName) ServerInfo(server, boundPort, collection) } + + private def createRedirectHttpsHandler(securePort: Int, scheme: String): ContextHandler = { + val redirectHandler: ContextHandler = new ContextHandler + redirectHandler.setContextPath("/") + redirectHandler.setHandler(new AbstractHandler { + override def handle( + target: String, + baseRequest: Request, + request: HttpServletRequest, + response: HttpServletResponse): Unit = { + if (baseRequest.isSecure) { + return + } + val httpsURI = createRedirectURI(scheme, baseRequest.getServerName, securePort, + baseRequest.getRequestURI, baseRequest.getQueryString) + response.setContentLength(0) + response.encodeRedirectURL(httpsURI) + response.sendRedirect(httpsURI) + baseRequest.setHandled(true) + } + }) + redirectHandler + } + + // Create a new URI from the arguments, handling IPv6 host encoding and default ports. + private def createRedirectURI( + scheme: String, server: String, port: Int, path: String, query: String) = { + val redirectServer = if (server.contains(":") && !server.startsWith("[")) { + s"[${server}]" + } else { + server + } + val authority = s"$redirectServer:$port" + new URI(scheme, authority, path, query, null).toString + } + } private[spark] case class ServerInfo( diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index e319937702f2361405080d075f785be27b29b946..eb53aa8e23ae7193d3e292ee8c545399a9210047 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -50,7 +50,8 @@ private[spark] class SparkUI private ( var appName: String, val basePath: String, val startTime: Long) - extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI") + extends WebUI(securityManager, securityManager.getSSLOptions("ui"), SparkUI.getUIPort(conf), + conf, basePath, "SparkUI") with Logging with UIRoot { diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 3925235984723f0c82f5fa825300278f16bbeb2c..fe4949b9f6feec98671ca1180ca2e77c7c5ae4e2 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -26,7 +26,7 @@ import scala.xml.Node import org.eclipse.jetty.servlet.ServletContextHandler import org.json4s.JsonAST.{JNothing, JValue} -import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SSLOptions} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.Utils @@ -38,6 +38,7 @@ import org.apache.spark.util.Utils */ private[spark] abstract class WebUI( val securityManager: SecurityManager, + val sslOptions: SSLOptions, port: Int, conf: SparkConf, basePath: String = "", @@ -133,7 +134,7 @@ private[spark] abstract class WebUI( def bind() { assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className)) try { - serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name)) + serverInfo = Some(startJettyServer("0.0.0.0", port, sslOptions, handlers, conf, name)) logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort)) } catch { case e: Exception => diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index f2d93edd4fd2e8db8d2d0c4a60cf8dcb8cc06750..3f4ac9b2f18cdc445f960196f27b305557462d32 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -86,7 +86,7 @@ private[spark] object AkkaUtils extends Logging { val secureCookie = if (isAuthOn) secretKey else "" logDebug(s"In createActorSystem, requireCookie is: $requireCookie") - val akkaSslConfig = securityManager.akkaSSLOptions.createAkkaConfig + val akkaSslConfig = securityManager.getSSLOptions("akka").createAkkaConfig .getOrElse(ConfigFactory.empty()) val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap.asJava) diff --git a/core/src/test/resources/spark.keystore b/core/src/test/resources/spark.keystore new file mode 100644 index 0000000000000000000000000000000000000000..f30716b57b30298e78e85011d7595bb42d6a85d7 Binary files /dev/null and b/core/src/test/resources/spark.keystore differ diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala index e0226803bb1cfe4cdec14f7aa96b2d8925530a6a..7603cef7736912c00f9357f75db02db82d51f498 100644 --- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala @@ -181,9 +181,10 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties { "SSL_DHE_RSA_WITH_AES_128_CBC_SHA256") val securityManager = new SecurityManager(conf) + val akkaSSLOptions = securityManager.getSSLOptions("akka") assert(securityManager.fileServerSSLOptions.enabled === true) - assert(securityManager.akkaSSLOptions.enabled === true) + assert(akkaSSLOptions.enabled === true) assert(securityManager.sslSocketFactory.isDefined === true) assert(securityManager.hostnameVerifier.isDefined === true) @@ -198,15 +199,15 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties { assert(securityManager.fileServerSSLOptions.protocol === Some("TLSv1.2")) assert(securityManager.fileServerSSLOptions.enabledAlgorithms === expectedAlgorithms) - assert(securityManager.akkaSSLOptions.trustStore.isDefined === true) - assert(securityManager.akkaSSLOptions.trustStore.get.getName === "truststore") - assert(securityManager.akkaSSLOptions.keyStore.isDefined === true) - assert(securityManager.akkaSSLOptions.keyStore.get.getName === "keystore") - assert(securityManager.akkaSSLOptions.trustStorePassword === Some("password")) - assert(securityManager.akkaSSLOptions.keyStorePassword === Some("password")) - assert(securityManager.akkaSSLOptions.keyPassword === Some("password")) - assert(securityManager.akkaSSLOptions.protocol === Some("TLSv1.2")) - assert(securityManager.akkaSSLOptions.enabledAlgorithms === expectedAlgorithms) + assert(akkaSSLOptions.trustStore.isDefined === true) + assert(akkaSSLOptions.trustStore.get.getName === "truststore") + assert(akkaSSLOptions.keyStore.isDefined === true) + assert(akkaSSLOptions.keyStore.get.getName === "keystore") + assert(akkaSSLOptions.trustStorePassword === Some("password")) + assert(akkaSSLOptions.keyStorePassword === Some("password")) + assert(akkaSSLOptions.keyPassword === Some("password")) + assert(akkaSSLOptions.protocol === Some("TLSv1.2")) + assert(akkaSSLOptions.enabledAlgorithms === expectedAlgorithms) } test("ssl off setup") { @@ -218,7 +219,6 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties { val securityManager = new SecurityManager(conf) assert(securityManager.fileServerSSLOptions.enabled === false) - assert(securityManager.akkaSSLOptions.enabled === false) assert(securityManager.sslSocketFactory.isDefined === false) assert(securityManager.hostnameVerifier.isDefined === false) } diff --git a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala index 86455a13d0fe7b9a0fdf5bef77312225e901ae39..190e4dd7285b3a913b50111d62e2e74ddfc5aff8 100644 --- a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala @@ -50,7 +50,7 @@ private[deploy] object DeployTestUtils { createDriverDesc(), new Date()) def createWorkerInfo(): WorkerInfo = { - val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress") + val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, "http://publicAddress:80") workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis workerInfo } diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 10e33a32ba4c3af2d2255f7210ae96f57709898d..ce00807ea46b906f1ee227297ae03d5c057b9013 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -90,8 +90,7 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva cores = 0, memory = 0, endpoint = null, - webUiPort = 0, - publicAddress = "" + webUiAddress = "http://localhost:80" ) val (rpcEnv, _, _) = @@ -376,7 +375,7 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva private def makeWorkerInfo(memoryMb: Int, cores: Int): WorkerInfo = { val workerId = System.currentTimeMillis.toString - new WorkerInfo(workerId, "host", 100, cores, memoryMb, null, 101, "address") + new WorkerInfo(workerId, "host", 100, cores, memoryMb, null, "http://localhost:80") } private def scheduleExecutorsOnWorkers( diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala index b4deed7f877e840fe20768e8c9c010b168711bd2..62fe0eaedfd27bccdfdfa8d445bd10255d57964c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala @@ -88,9 +88,7 @@ class PersistenceEngineSuite extends SparkFunSuite { cores = 0, memory = 0, endpoint = workerEndpoint, - webUiPort = 0, - publicAddress = "" - ) + webUiAddress = "http://localhost:80") persistenceEngine.addWorker(workerToPersist) @@ -109,8 +107,7 @@ class PersistenceEngineSuite extends SparkFunSuite { assert(workerToPersist.cores === recoveryWorkerInfo.cores) assert(workerToPersist.memory === recoveryWorkerInfo.memory) assert(workerToPersist.endpoint === recoveryWorkerInfo.endpoint) - assert(workerToPersist.webUiPort === recoveryWorkerInfo.webUiPort) - assert(workerToPersist.publicAddress === recoveryWorkerInfo.publicAddress) + assert(workerToPersist.webUiAddress === recoveryWorkerInfo.webUiAddress) } finally { testRpcEnv.shutdown() testRpcEnv.awaitTermination() diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 2d28b67ef23fa77628e3574120ff7bf3e2c9c326..69c46058f1c1ae8b4d8f27653aaa6c67fc7cbf3c 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -17,16 +17,16 @@ package org.apache.spark.ui -import java.net.ServerSocket +import java.net.{BindException, ServerSocket} import scala.io.Source -import scala.util.{Failure, Success, Try} +import org.eclipse.jetty.server.Server import org.eclipse.jetty.servlet.ServletContextHandler import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ -import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark._ import org.apache.spark.LocalSparkContext._ class UISuite extends SparkFunSuite { @@ -45,6 +45,20 @@ class UISuite extends SparkFunSuite { sc } + private def sslDisabledConf(): (SparkConf, SSLOptions) = { + val conf = new SparkConf + (conf, new SecurityManager(conf).getSSLOptions("ui")) + } + + private def sslEnabledConf(): (SparkConf, SSLOptions) = { + val conf = new SparkConf() + .set("spark.ssl.ui.enabled", "true") + .set("spark.ssl.ui.keyStore", "./src/test/resources/spark.keystore") + .set("spark.ssl.ui.keyStorePassword", "123456") + .set("spark.ssl.ui.keyPassword", "123456") + (conf, new SecurityManager(conf).getSSLOptions("ui")) + } + ignore("basic ui visibility") { withSpark(newSparkContext()) { sc => // test if the ui is visible, and all the expected tabs are visible @@ -70,33 +84,92 @@ class UISuite extends SparkFunSuite { } test("jetty selects different port under contention") { - val server = new ServerSocket(0) - val startPort = server.getLocalPort - val serverInfo1 = JettyUtils.startJettyServer( - "0.0.0.0", startPort, Seq[ServletContextHandler](), new SparkConf) - val serverInfo2 = JettyUtils.startJettyServer( - "0.0.0.0", startPort, Seq[ServletContextHandler](), new SparkConf) - // Allow some wiggle room in case ports on the machine are under contention - val boundPort1 = serverInfo1.boundPort - val boundPort2 = serverInfo2.boundPort - assert(boundPort1 != startPort) - assert(boundPort2 != startPort) - assert(boundPort1 != boundPort2) - serverInfo1.server.stop() - serverInfo2.server.stop() - server.close() + var server: ServerSocket = null + var serverInfo1: ServerInfo = null + var serverInfo2: ServerInfo = null + val (conf, sslOptions) = sslDisabledConf() + try { + server = new ServerSocket(0) + val startPort = server.getLocalPort + serverInfo1 = JettyUtils.startJettyServer( + "0.0.0.0", startPort, sslOptions, Seq[ServletContextHandler](), conf) + serverInfo2 = JettyUtils.startJettyServer( + "0.0.0.0", startPort, sslOptions, Seq[ServletContextHandler](), conf) + // Allow some wiggle room in case ports on the machine are under contention + val boundPort1 = serverInfo1.boundPort + val boundPort2 = serverInfo2.boundPort + assert(boundPort1 != startPort) + assert(boundPort2 != startPort) + assert(boundPort1 != boundPort2) + } finally { + stopServer(serverInfo1) + stopServer(serverInfo2) + closeSocket(server) + } + } + + test("jetty with https selects different port under contention") { + var server: ServerSocket = null + var serverInfo1: ServerInfo = null + var serverInfo2: ServerInfo = null + try { + server = new ServerSocket(0) + val startPort = server.getLocalPort + val (conf, sslOptions) = sslEnabledConf() + serverInfo1 = JettyUtils.startJettyServer( + "0.0.0.0", startPort, sslOptions, Seq[ServletContextHandler](), conf, "server1") + serverInfo2 = JettyUtils.startJettyServer( + "0.0.0.0", startPort, sslOptions, Seq[ServletContextHandler](), conf, "server2") + // Allow some wiggle room in case ports on the machine are under contention + val boundPort1 = serverInfo1.boundPort + val boundPort2 = serverInfo2.boundPort + assert(boundPort1 != startPort) + assert(boundPort2 != startPort) + assert(boundPort1 != boundPort2) + } finally { + stopServer(serverInfo1) + stopServer(serverInfo2) + closeSocket(server) + } } test("jetty binds to port 0 correctly") { - val serverInfo = JettyUtils.startJettyServer( - "0.0.0.0", 0, Seq[ServletContextHandler](), new SparkConf) - val server = serverInfo.server - val boundPort = serverInfo.boundPort - assert(server.getState === "STARTED") - assert(boundPort != 0) - Try { new ServerSocket(boundPort) } match { - case Success(s) => fail("Port %s doesn't seem used by jetty server".format(boundPort)) - case Failure(e) => + var socket: ServerSocket = null + var serverInfo: ServerInfo = null + val (conf, sslOptions) = sslDisabledConf() + try { + serverInfo = JettyUtils.startJettyServer( + "0.0.0.0", 0, sslOptions, Seq[ServletContextHandler](), conf) + val server = serverInfo.server + val boundPort = serverInfo.boundPort + assert(server.getState === "STARTED") + assert(boundPort != 0) + intercept[BindException] { + socket = new ServerSocket(boundPort) + } + } finally { + stopServer(serverInfo) + closeSocket(socket) + } + } + + test("jetty with https binds to port 0 correctly") { + var socket: ServerSocket = null + var serverInfo: ServerInfo = null + try { + val (conf, sslOptions) = sslEnabledConf() + serverInfo = JettyUtils.startJettyServer( + "0.0.0.0", 0, sslOptions, Seq[ServletContextHandler](), conf) + val server = serverInfo.server + val boundPort = serverInfo.boundPort + assert(server.getState === "STARTED") + assert(boundPort != 0) + intercept[BindException] { + socket = new ServerSocket(boundPort) + } + } finally { + stopServer(serverInfo) + closeSocket(socket) } } @@ -117,4 +190,12 @@ class UISuite extends SparkFunSuite { assert(splitUIAddress(2).toInt == boundPort) } } + + def stopServer(info: ServerInfo): Unit = { + if (info != null && info.server != null) info.server.stop + } + + def closeSocket(socket: ServerSocket): Unit = { + if (socket != null) socket.close + } } diff --git a/docs/configuration.md b/docs/configuration.md index 08392c39187b919c7d814666767e965d42c5e244..12ac60129633dbb0cea7bbe4ba99e6621451e9f1 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1430,6 +1430,7 @@ Apart from these, the following properties are also available, and may be useful The reference list of protocols one can find on <a href="https://blogs.oracle.com/java-platform-group/entry/diagnosing_tls_ssl_and_https">this</a> page. + Note: If not set, it will use the default cipher suites of JVM. </td> </tr> <tr> @@ -1454,6 +1455,13 @@ Apart from these, the following properties are also available, and may be useful A password to the key-store. </td> </tr> + <tr> + <td><code>spark.ssl.keyStoreType</code></td> + <td>JKS</td> + <td> + The type of the key-store. + </td> + </tr> <tr> <td><code>spark.ssl.protocol</code></td> <td>None</td> @@ -1463,6 +1471,13 @@ Apart from these, the following properties are also available, and may be useful page. </td> </tr> + <tr> + <td><code>spark.ssl.needClientAuth</code></td> + <td>false</td> + <td> + Set true if SSL needs client authentication. + </td> + </tr> <tr> <td><code>spark.ssl.trustStore</code></td> <td>None</td> @@ -1478,6 +1493,13 @@ Apart from these, the following properties are also available, and may be useful A password to the trust-store. </td> </tr> + <tr> + <td><code>spark.ssl.trustStoreType</code></td> + <td>JKS</td> + <td> + The type of the trust-store. + </td> + </tr> </table> diff --git a/docs/security.md b/docs/security.md index 1b7741d4dd93c16330261d815c64d11bfeda057e..a4cc0f42b24826cad5b2e2887d8238c778ef8683 100644 --- a/docs/security.md +++ b/docs/security.md @@ -6,15 +6,19 @@ title: Security Spark currently supports authentication via a shared secret. Authentication can be configured to be on via the `spark.authenticate` configuration parameter. This parameter controls whether the Spark communication protocols do authentication using the shared secret. This authentication is a basic handshake to make sure both sides have the same shared secret and are allowed to communicate. If the shared secret is not identical they will not be allowed to communicate. The shared secret is created as follows: -* For Spark on [YARN](running-on-yarn.html) deployments, configuring `spark.authenticate` to `true` will automatically handle generating and distributing the shared secret. Each application will use a unique shared secret. +* For Spark on [YARN](running-on-yarn.html) deployments, configuring `spark.authenticate` to `true` will automatically handle generating and distributing the shared secret. Each application will use a unique shared secret. * For other types of Spark deployments, the Spark parameter `spark.authenticate.secret` should be configured on each of the nodes. This secret will be used by all the Master/Workers and applications. ## Web UI -The Spark UI can also be secured by using [javax servlet filters](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html) via the `spark.ui.filters` setting. A user may want to secure the UI if it has data that other users should not be allowed to see. The javax servlet filter specified by the user can authenticate the user and then once the user is logged in, Spark can compare that user versus the view ACLs to make sure they are authorized to view the UI. The configs `spark.acls.enable` and `spark.ui.view.acls` control the behavior of the ACLs. Note that the user who started the application always has view access to the UI. On YARN, the Spark UI uses the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters. +The Spark UI can be secured by using [javax servlet filters](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html) via the `spark.ui.filters` setting +and by using [https/SSL](http://en.wikipedia.org/wiki/HTTPS) via the `spark.ui.https.enabled` setting. -Spark also supports modify ACLs to control who has access to modify a running Spark application. This includes things like killing the application or a task. This is controlled by the configs `spark.acls.enable` and `spark.modify.acls`. Note that if you are authenticating the web UI, in order to use the kill button on the web UI it might be necessary to add the users in the modify acls to the view acls also. On YARN, the modify acls are passed in and control who has modify access via YARN interfaces. +### Authentication +A user may want to secure the UI if it has data that other users should not be allowed to see. The javax servlet filter specified by the user can authenticate the user and then once the user is logged in, Spark can compare that user versus the view ACLs to make sure they are authorized to view the UI. The configs `spark.acls.enable` and `spark.ui.view.acls` control the behavior of the ACLs. Note that the user who started the application always has view access to the UI. On YARN, the Spark UI uses the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters. + +Spark also supports modify ACLs to control who has access to modify a running Spark application. This includes things like killing the application or a task. This is controlled by the configs `spark.acls.enable` and `spark.modify.acls`. Note that if you are authenticating the web UI, in order to use the kill button on the web UI it might be necessary to add the users in the modify acls to the view acls also. On YARN, the modify acls are passed in and control who has modify access via YARN interfaces. Spark allows for a set of administrators to be specified in the acls who always have view and modify permissions to all the applications. is controlled by the config `spark.admin.acls`. This is useful on a shared cluster where you might have administrators or support staff who help users debug applications. ## Event Logging @@ -23,8 +27,8 @@ If your applications are using event logging, the directory where the event logs ## Encryption -Spark supports SSL for Akka and HTTP (for file server) protocols. SASL encryption is -supported for the block transfer service. Encryption is not yet supported for the WebUI. +Spark supports SSL for Akka and HTTP protocols. SASL encryption is supported for the block transfer +service. Encryption is not yet supported for data stored by Spark in temporary local storage, such as shuffle files, cached data, and other application files. If encrypting this data is desired, a workaround is @@ -32,8 +36,41 @@ to configure your cluster manager to store application data on encrypted disks. ### SSL Configuration -Configuration for SSL is organized hierarchically. The user can configure the default SSL settings which will be used for all the supported communication protocols unless they are overwritten by protocol-specific settings. This way the user can easily provide the common settings for all the protocols without disabling the ability to configure each one individually. The common SSL settings are at `spark.ssl` namespace in Spark configuration, while Akka SSL configuration is at `spark.ssl.akka` and HTTP for file server SSL configuration is at `spark.ssl.fs`. The full breakdown can be found on the [configuration page](configuration.html). +Configuration for SSL is organized hierarchically. The user can configure the default SSL settings +which will be used for all the supported communication protocols unless they are overwritten by +protocol-specific settings. This way the user can easily provide the common settings for all the +protocols without disabling the ability to configure each one individually. The common SSL settings +are at `spark.ssl` namespace in Spark configuration. The following table describes the +component-specific configuration namespaces used to override the default settings: + +<table class="table"> + <tr> + <th>Config Namespace</th> + <th>Component</th> + </tr> + <tr> + <td><code>spark.ssl.akka</code></td> + <td>Akka communication channels</td> + </tr> + <tr> + <td><code>spark.ssl.fs</code></td> + <td>HTTP file server and broadcast server</td> + </tr> + <tr> + <td><code>spark.ssl.ui</code></td> + <td>Spark application Web UI</td> + </tr> + <tr> + <td><code>spark.ssl.standalone</code></td> + <td>Standalone Master / Worker Web UI</td> + </tr> + <tr> + <td><code>spark.ssl.historyServer</code></td> + <td>History Server Web UI</td> + </tr> +</table> +The full breakdown of available SSL options can be found on the [configuration page](configuration.html). SSL must be configured on each node and configured for each component involved in communication using the particular protocol. ### YARN mode