Skip to content
Snippets Groups Projects
Commit 90cbc82f authored by jerryshao's avatar jerryshao Committed by Reynold Xin
Browse files

[SPARK-14725][CORE] Remove HttpServer class

## What changes were proposed in this pull request?

This proposal removes the class `HttpServer`, with the changing of internal file/jar/class transmission to RPC layer, currently there's no code using this `HttpServer`, so here propose to remove it.

## How was this patch tested?

Unit test is verified locally.

Author: jerryshao <sshao@hortonworks.com>

Closes #12526 from jerryshao/SPARK-14725.
parent b4e76a9a
No related branches found
No related tags found
No related merge requests found
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import java.io.File
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}
import org.eclipse.jetty.security.authentication.DigestAuthenticator
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.bio.SocketConnector
import org.eclipse.jetty.server.ssl.SslSocketConnector
import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler, ServletHolder}
import org.eclipse.jetty.util.component.LifeCycle
import org.eclipse.jetty.util.security.{Constraint, Password}
import org.eclipse.jetty.util.thread.QueuedThreadPool
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
/**
* Exception type thrown by HttpServer when it is in the wrong state for an operation.
*/
private[spark] class ServerStateException(message: String) extends Exception(message)
/**
* An HTTP server for static content used to allow worker nodes to access JARs added to SparkContext
* as well as classes created by the interpreter when the user types in code. This is just a wrapper
* around a Jetty server.
*/
private[spark] class HttpServer(
conf: SparkConf,
resourceBase: File,
securityManager: SecurityManager,
requestedPort: Int = 0,
serverName: String = "HTTP server")
extends Logging {
private var server: Server = null
private var port: Int = requestedPort
private val servlets = {
val handler = new ServletContextHandler()
handler.setContextPath("/")
handler
}
def start() {
if (server != null) {
throw new ServerStateException("Server is already started")
} else {
logInfo("Starting HTTP Server")
val (actualServer, actualPort) =
Utils.startServiceOnPort[Server](requestedPort, doStart, conf, serverName)
server = actualServer
port = actualPort
}
}
def addDirectory(contextPath: String, resourceBase: String): Unit = {
val holder = new ServletHolder()
holder.setInitParameter("resourceBase", resourceBase)
holder.setInitParameter("pathInfoOnly", "true")
holder.setServlet(new DefaultServlet())
servlets.addServlet(holder, contextPath.stripSuffix("/") + "/*")
}
/**
* Actually start the HTTP server on the given port.
*
* Note that this is only best effort in the sense that we may end up binding to a nearby port
* in the event of port collision. Return the bound server and the actual port used.
*/
private def doStart(startPort: Int): (Server, Int) = {
val server = new Server()
val connector = securityManager.fileServerSSLOptions.createJettySslContextFactory()
.map(new SslSocketConnector(_)).getOrElse(new SocketConnector)
connector.setMaxIdleTime(60 * 1000)
connector.setSoLingerTime(-1)
connector.setPort(startPort)
server.addConnector(connector)
val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)
server.setThreadPool(threadPool)
addDirectory("/", resourceBase.getAbsolutePath)
if (securityManager.isAuthenticationEnabled()) {
logDebug("HttpServer is using security")
val sh = setupSecurityHandler(securityManager)
// make sure we go through security handler to get resources
sh.setHandler(servlets)
server.setHandler(sh)
} else {
logDebug("HttpServer is not using security")
server.setHandler(servlets)
}
server.start()
val actualPort = server.getConnectors()(0).getLocalPort
(server, actualPort)
}
/**
* Setup Jetty to the HashLoginService using a single user with our
* shared secret. Configure it to use DIGEST-MD5 authentication so that the password
* isn't passed in plaintext.
*/
private def setupSecurityHandler(securityMgr: SecurityManager): ConstraintSecurityHandler = {
val constraint = new Constraint()
// use DIGEST-MD5 as the authentication mechanism
constraint.setName(Constraint.__DIGEST_AUTH)
constraint.setRoles(Array("user"))
constraint.setAuthenticate(true)
constraint.setDataConstraint(Constraint.DC_NONE)
val cm = new ConstraintMapping()
cm.setConstraint(constraint)
cm.setPathSpec("/*")
val sh = new ConstraintSecurityHandler()
// the hashLoginService lets us do a single user and
// secret right now. This could be changed to use the
// JAASLoginService for other options.
val hashLogin = new HashLoginService()
val userCred = new Password(securityMgr.getSecretKey())
if (userCred == null) {
throw new Exception("Error: secret key is null with authentication on")
}
hashLogin.putUser(securityMgr.getHttpUser(), userCred, Array("user"))
sh.setLoginService(hashLogin)
sh.setAuthenticator(new DigestAuthenticator());
sh.setConstraintMappings(Array(cm))
sh
}
def stop() {
if (server == null) {
throw new ServerStateException("Server is already stopped")
} else {
server.stop()
// Stop the ThreadPool if it supports stop() method (through LifeCycle).
// It is needed because stopping the Server won't stop the ThreadPool it uses.
val threadPool = server.getThreadPool
if (threadPool != null && threadPool.isInstanceOf[LifeCycle]) {
threadPool.asInstanceOf[LifeCycle].stop
}
port = -1
server = null
}
}
/**
* Get the URI of this HTTP server (http://host:port or https://host:port)
*/
def uri: String = {
if (server == null) {
throw new ServerStateException("Server is not started")
} else {
val scheme = if (securityManager.fileServerSSLOptions.enabled) "https" else "http"
s"$scheme://${Utils.localHostNameForURI()}:$port"
}
}
}
......@@ -57,7 +57,6 @@ class ExecutorClassLoaderSuite
var tempDir2: File = _
var url1: String = _
var urls2: Array[URL] = _
var classServer: HttpServer = _
override def beforeAll() {
super.beforeAll()
......@@ -74,9 +73,6 @@ class ExecutorClassLoaderSuite
override def afterAll() {
try {
if (classServer != null) {
classServer.stop()
}
Utils.deleteRecursively(tempDir1)
Utils.deleteRecursively(tempDir2)
SparkEnv.set(null)
......@@ -137,55 +133,6 @@ class ExecutorClassLoaderSuite
assert(fileReader.readLine().contains("resource"), "File doesn't contain 'resource'")
}
test("failing to fetch classes from HTTP server should not leak resources (SPARK-6209)") {
// This is a regression test for SPARK-6209, a bug where each failed attempt to load a class
// from the driver's class server would leak a HTTP connection, causing the class server's
// thread / connection pool to be exhausted.
val conf = new SparkConf()
val securityManager = new SecurityManager(conf)
classServer = new HttpServer(conf, tempDir1, securityManager)
classServer.start()
// ExecutorClassLoader uses SparkEnv's SecurityManager, so we need to mock this
val mockEnv = mock[SparkEnv]
when(mockEnv.securityManager).thenReturn(securityManager)
SparkEnv.set(mockEnv)
// Create an ExecutorClassLoader that's configured to load classes from the HTTP server
val parentLoader = new URLClassLoader(Array.empty, null)
val classLoader = new ExecutorClassLoader(conf, null, classServer.uri, parentLoader, false)
classLoader.httpUrlConnectionTimeoutMillis = 500
// Check that this class loader can actually load classes that exist
val fakeClass = classLoader.loadClass("ReplFakeClass2").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "1")
// Try to perform a full GC now, since GC during the test might mask resource leaks
System.gc()
// When the original bug occurs, the test thread becomes blocked in a classloading call
// and does not respond to interrupts. Therefore, use a custom ScalaTest interruptor to
// shut down the HTTP server when the test times out
val interruptor: Interruptor = new Interruptor {
override def apply(thread: Thread): Unit = {
classServer.stop()
classServer = null
thread.interrupt()
}
}
def tryAndFailToLoadABunchOfClasses(): Unit = {
// The number of trials here should be much larger than Jetty's thread / connection limit
// in order to expose thread or connection leaks
for (i <- 1 to 1000) {
if (Thread.currentThread().isInterrupted) {
throw new InterruptedException()
}
// Incorporate the iteration number into the class name in order to avoid any response
// caching that might be added in the future
intercept[ClassNotFoundException] {
classLoader.loadClass(s"ReplFakeClassDoesNotExist$i").newInstance()
}
}
}
failAfter(10 seconds)(tryAndFailToLoadABunchOfClasses())(interruptor)
}
test("fetch classes using Spark's RpcEnv") {
val env = mock[SparkEnv]
val rpcEnv = mock[RpcEnv]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment