Skip to content
Snippets Groups Projects
Commit 4661d30b authored by Marcelo Vanzin's avatar Marcelo Vanzin
Browse files

[SPARK-19554][UI,YARN] Allow SHS URL to be used for tracking in YARN RM.

Allow an application to use the History Server URL as the tracking
URL in the YARN RM, so there's still a link to the web UI somewhere
in YARN even if the driver's UI is disabled. This is useful, for
example, if an admin wants to disable the driver UI by default for
applications, since it's harder to secure it (since it involves non
trivial ssl certificate and auth management that admins may not want
to expose to user apps).

This needs to be opt-in, because of the way the YARN proxy works, so
a new configuration was added to enable the option.

The YARN RM will proxy requests to live AMs instead of redirecting
the client, so pages in the SHS UI will not render correctly since
they'll reference invalid paths in the RM UI. The proxy base support
in the SHS cannot be used since that would prevent direct access to
the SHS.

So, to solve this problem, for the feature to work end-to-end, a new
YARN-specific filter was added that detects whether the requests come
from the proxy and redirects the client appropriatly. The SHS admin has
to add this filter manually if they want the feature to work.

Tested with new unit test, and by running with the documented configuration
set in a test cluster. Also verified the driver UI is used when it's
enabled.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #16946 from vanzin/SPARK-19554.
parent 37112fcf
No related branches found
No related tags found
No related merge requests found
...@@ -604,3 +604,18 @@ spark.yarn.am.extraJavaOptions -Dsun.security.krb5.debug=true -Dsun.security.spn ...@@ -604,3 +604,18 @@ spark.yarn.am.extraJavaOptions -Dsun.security.krb5.debug=true -Dsun.security.spn
Finally, if the log level for `org.apache.spark.deploy.yarn.Client` is set to `DEBUG`, the log Finally, if the log level for `org.apache.spark.deploy.yarn.Client` is set to `DEBUG`, the log
will include a list of all tokens obtained, and their expiry details will include a list of all tokens obtained, and their expiry details
## Using the Spark History Server to replace the Spark Web UI
It is possible to use the Spark History Server application page as the tracking URL for running
applications when the application UI is disabled. This may be desirable on secure clusters, or to
reduce the memory usage of the Spark driver. To set up tracking through the Spark History Server,
do the following:
- On the application side, set <code>spark.yarn.historyServer.allowTracking=true</code> in Spark's
configuration. This will tell Spark to use the history server's URL as the tracking URL if
the application's UI is disabled.
- On the Spark History Server, add <code>org.apache.spark.deploy.yarn.YarnProxyRedirectFilter</code>
to the list of filters in the <code>spark.ui.filters</code> configuration.
Be aware that the history server information may not be up-to-date with the application's state.
...@@ -332,7 +332,7 @@ private[spark] class ApplicationMaster( ...@@ -332,7 +332,7 @@ private[spark] class ApplicationMaster(
_sparkConf: SparkConf, _sparkConf: SparkConf,
_rpcEnv: RpcEnv, _rpcEnv: RpcEnv,
driverRef: RpcEndpointRef, driverRef: RpcEndpointRef,
uiAddress: String, uiAddress: Option[String],
securityMgr: SecurityManager) = { securityMgr: SecurityManager) = {
val appId = client.getAttemptId().getApplicationId().toString() val appId = client.getAttemptId().getApplicationId().toString()
val attemptId = client.getAttemptId().getAttemptId().toString() val attemptId = client.getAttemptId().getAttemptId().toString()
...@@ -408,8 +408,7 @@ private[spark] class ApplicationMaster( ...@@ -408,8 +408,7 @@ private[spark] class ApplicationMaster(
sc.getConf.get("spark.driver.host"), sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"), sc.getConf.get("spark.driver.port"),
isClusterMode = true) isClusterMode = true)
registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl).getOrElse(""), registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr)
securityMgr)
} else { } else {
// Sanity check; should never happen in normal operation, since sc should only be null // Sanity check; should never happen in normal operation, since sc should only be null
// if the user app did not create a SparkContext. // if the user app did not create a SparkContext.
...@@ -435,7 +434,7 @@ private[spark] class ApplicationMaster( ...@@ -435,7 +434,7 @@ private[spark] class ApplicationMaster(
clientMode = true) clientMode = true)
val driverRef = waitForSparkDriver() val driverRef = waitForSparkDriver()
addAmIpFilter() addAmIpFilter()
registerAM(sparkConf, rpcEnv, driverRef, sparkConf.get("spark.driver.appUIAddress", ""), registerAM(sparkConf, rpcEnv, driverRef, sparkConf.getOption("spark.driver.appUIAddress"),
securityMgr) securityMgr)
// In client mode the actor will stop the reporter thread. // In client mode the actor will stop the reporter thread.
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.yarn
import javax.servlet._
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
import org.apache.spark.internal.Logging
/**
* A filter to be used in the Spark History Server for redirecting YARN proxy requests to the
* main SHS address. This is useful for applications that are using the history server as the
* tracking URL, since the SHS-generated pages cannot be rendered in that case without extra
* configuration to set up a proxy base URI (meaning the SHS cannot be ever used directly).
*/
class YarnProxyRedirectFilter extends Filter with Logging {
import YarnProxyRedirectFilter._
override def destroy(): Unit = { }
override def init(config: FilterConfig): Unit = { }
override def doFilter(req: ServletRequest, res: ServletResponse, chain: FilterChain): Unit = {
val hreq = req.asInstanceOf[HttpServletRequest]
// The YARN proxy will send a request with the "proxy-user" cookie set to the YARN's client
// user name. We don't expect any other clients to set this cookie, since the SHS does not
// use cookies for anything.
Option(hreq.getCookies()).flatMap(_.find(_.getName() == COOKIE_NAME)) match {
case Some(_) =>
doRedirect(hreq, res.asInstanceOf[HttpServletResponse])
case _ =>
chain.doFilter(req, res)
}
}
private def doRedirect(req: HttpServletRequest, res: HttpServletResponse): Unit = {
val redirect = req.getRequestURL().toString()
// Need a client-side redirect instead of an HTTP one, otherwise the YARN proxy itself
// will handle the redirect and get into an infinite loop.
val content = s"""
|<html xmlns="http://www.w3.org/1999/xhtml">
|<head>
| <title>Spark History Server Redirect</title>
| <meta http-equiv="refresh" content="0;URL='$redirect'" />
|</head>
|<body>
| <p>The requested page can be found at: <a href="$redirect">$redirect</a>.</p>
|</body>
|</html>
""".stripMargin
logDebug(s"Redirecting YARN proxy request to $redirect.")
res.setStatus(HttpServletResponse.SC_OK)
res.setContentType("text/html")
res.getWriter().write(content)
}
}
private[spark] object YarnProxyRedirectFilter {
val COOKIE_NAME = "proxy-user"
}
...@@ -55,7 +55,7 @@ private[spark] class YarnRMClient extends Logging { ...@@ -55,7 +55,7 @@ private[spark] class YarnRMClient extends Logging {
driverRef: RpcEndpointRef, driverRef: RpcEndpointRef,
conf: YarnConfiguration, conf: YarnConfiguration,
sparkConf: SparkConf, sparkConf: SparkConf,
uiAddress: String, uiAddress: Option[String],
uiHistoryAddress: String, uiHistoryAddress: String,
securityMgr: SecurityManager, securityMgr: SecurityManager,
localResources: Map[String, LocalResource] localResources: Map[String, LocalResource]
...@@ -65,9 +65,13 @@ private[spark] class YarnRMClient extends Logging { ...@@ -65,9 +65,13 @@ private[spark] class YarnRMClient extends Logging {
amClient.start() amClient.start()
this.uiHistoryAddress = uiHistoryAddress this.uiHistoryAddress = uiHistoryAddress
val trackingUrl = uiAddress.getOrElse {
if (sparkConf.get(ALLOW_HISTORY_SERVER_TRACKING_URL)) uiHistoryAddress else ""
}
logInfo("Registering the ApplicationMaster") logInfo("Registering the ApplicationMaster")
synchronized { synchronized {
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress) amClient.registerApplicationMaster(Utils.localHostName(), 0, trackingUrl)
registered = true registered = true
} }
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr, new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
......
...@@ -82,6 +82,13 @@ package object config { ...@@ -82,6 +82,13 @@ package object config {
.stringConf .stringConf
.createOptional .createOptional
private[spark] val ALLOW_HISTORY_SERVER_TRACKING_URL =
ConfigBuilder("spark.yarn.historyServer.allowTracking")
.doc("Allow using the History Server URL for the application as the tracking URL for the " +
"application when the Web UI is not enabled.")
.booleanConf
.createWithDefault(false)
/* File distribution. */ /* File distribution. */
private[spark] val SPARK_ARCHIVE = ConfigBuilder("spark.yarn.archive") private[spark] val SPARK_ARCHIVE = ConfigBuilder("spark.yarn.archive")
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.yarn
import java.io.{PrintWriter, StringWriter}
import javax.servlet.FilterChain
import javax.servlet.http.{Cookie, HttpServletRequest, HttpServletResponse}
import org.mockito.Mockito._
import org.apache.spark.SparkFunSuite
class YarnProxyRedirectFilterSuite extends SparkFunSuite {
test("redirect proxied requests, pass-through others") {
val requestURL = "http://example.com:1234/foo?"
val filter = new YarnProxyRedirectFilter()
val cookies = Array(new Cookie(YarnProxyRedirectFilter.COOKIE_NAME, "dr.who"))
val req = mock(classOf[HttpServletRequest])
// First request mocks a YARN proxy request (with the cookie set), second one has no cookies.
when(req.getCookies()).thenReturn(cookies, null)
when(req.getRequestURL()).thenReturn(new StringBuffer(requestURL))
val res = mock(classOf[HttpServletResponse])
when(res.getWriter()).thenReturn(new PrintWriter(new StringWriter()))
val chain = mock(classOf[FilterChain])
// First request is proxied.
filter.doFilter(req, res, chain)
verify(chain, never()).doFilter(req, res)
// Second request is not, so should invoke the filter chain.
filter.doFilter(req, res, chain)
verify(chain, times(1)).doFilter(req, res)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment