diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index f40896457df95b766a9f09a3bc09895fcd28d5b2..68e57b7564ad16d734fc2f177860962706253d8d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -125,10 +125,10 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") <td>{executor.memory}</td> <td>{executor.state}</td> <td> - <a href={"%s/logPage?appId=%s&executorId=%s&logType=stdout" - .format(workerUrlRef, executor.application.id, executor.id)}>stdout</a> - <a href={"%s/logPage?appId=%s&executorId=%s&logType=stderr" - .format(workerUrlRef, executor.application.id, executor.id)}>stderr</a> + <a href={s"$workerUrlRef/logPage?appId=${executor.application.id}&executorId=${executor. + id}&logType=stdout"}>stdout</a> + <a href={s"$workerUrlRef/logPage?appId=${executor.application.id}&executorId=${executor. + id}&logType=stderr"}>stderr</a> </td> </tr> } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index f6d3876e3bbfac6245f12c29dbda5df24e037d98..29a810fe7abe0f06c8826aedbaad0726c9c8c96f 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -155,6 +155,8 @@ private[deploy] class Worker( private val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr) private val workerSource = new WorkerSource(this) + val reverseProxy = conf.getBoolean("spark.ui.reverseProxy", false) + private var registerMasterFutures: Array[JFuture[_]] = null private var registrationRetryTimer: Option[JScheduledFuture[_]] = None @@ -225,7 +227,7 @@ private[deploy] class Worker( masterAddressToConnect = Some(masterAddress) master = Some(masterRef) connected = true - if (conf.getBoolean("spark.ui.reverseProxy", false)) { + if (reverseProxy) { logInfo(s"WorkerWebUI is available at $activeMasterWebUiUrl/proxy/$workerId") } // Cancel any outstanding re-registration attempts because we found a new master diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala index ea39b0dce0a411a4cea6b11aa5fbee6e870bca3f..ce84bc4dae32cab9cba2abb7284d7ba910a43de3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala @@ -51,9 +51,11 @@ private[ui] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { val driverHeaders = Seq("DriverID", "Main Class", "State", "Cores", "Memory", "Logs", "Notes") val runningDrivers = workerState.drivers.sortBy(_.driverId).reverse - val runningDriverTable = UIUtils.listingTable(driverHeaders, driverRow, runningDrivers) + val runningDriverTable = UIUtils.listingTable[DriverRunner](driverHeaders, + driverRow(workerState.workerId, _), runningDrivers) val finishedDrivers = workerState.finishedDrivers.sortBy(_.driverId).reverse - val finishedDriverTable = UIUtils.listingTable(driverHeaders, driverRow, finishedDrivers) + val finishedDriverTable = UIUtils.listingTable[DriverRunner](driverHeaders, + driverRow(workerState.workerId, _), finishedDrivers) // For now we only show driver information if the user has submitted drivers to the cluster. // This is until we integrate the notion of drivers and applications in the UI. @@ -102,6 +104,11 @@ private[ui] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { } def executorRow(executor: ExecutorRunner): Seq[Node] = { + val workerUrlRef = UIUtils.makeHref(parent.worker.reverseProxy, executor.workerId, + parent.webUrl) + val appUrlRef = UIUtils.makeHref(parent.worker.reverseProxy, executor.appId, + executor.appDesc.appUiUrl) + <tr> <td>{executor.execId}</td> <td>{executor.cores}</td> @@ -115,7 +122,7 @@ private[ui] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { <li><strong>Name:</strong> { if ({executor.state == ExecutorState.RUNNING} && executor.appDesc.appUiUrl.nonEmpty) { - <a href={executor.appDesc.appUiUrl}> {executor.appDesc.name}</a> + <a href={appUrlRef}> {executor.appDesc.name}</a> } else { {executor.appDesc.name} } @@ -125,16 +132,17 @@ private[ui] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { </ul> </td> <td> - <a href={"logPage?appId=%s&executorId=%s&logType=stdout" - .format(executor.appId, executor.execId)}>stdout</a> - <a href={"logPage?appId=%s&executorId=%s&logType=stderr" - .format(executor.appId, executor.execId)}>stderr</a> + <a href={s"$workerUrlRef/logPage?appId=${executor + .appId}&executorId=${executor.execId}&logType=stdout"}>stdout</a> + <a href={s"$workerUrlRef/logPage?appId=${executor + .appId}&executorId=${executor.execId}&logType=stderr"}>stderr</a> </td> </tr> } - def driverRow(driver: DriverRunner): Seq[Node] = { + def driverRow(workerId: String, driver: DriverRunner): Seq[Node] = { + val workerUrlRef = UIUtils.makeHref(parent.worker.reverseProxy, workerId, parent.webUrl) <tr> <td>{driver.driverId}</td> <td>{driver.driverDesc.command.arguments(2)}</td> @@ -146,8 +154,8 @@ private[ui] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { {Utils.megabytesToString(driver.driverDesc.mem)} </td> <td> - <a href={s"logPage?driverId=${driver.driverId}&logType=stdout"}>stdout</a> - <a href={s"logPage?driverId=${driver.driverId}&logType=stderr"}>stderr</a> + <a href={s"$workerUrlRef/logPage?driverId=${driver.driverId}&logType=stdout"}>stdout</a> + <a href={s"$workerUrlRef/logPage?driverId=${driver.driverId}&logType=stderr"}>stderr</a> </td> <td> {driver.finalException.getOrElse("")} diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 880cf08d34016d2af33c6861e7ee87219763ae1e..52b7ab6347b47b86ed859841d5a3aed180b9697d 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -209,7 +209,8 @@ private[spark] object JettyUtils extends Logging { val id = prefix.drop(1) // Query master state for id's corresponding UI address - // If that address exists, turn it into a valid, target URI string or return null + // If that address exists, try to turn it into a valid, target URI string + // Otherwise, return null idToUiAddress(id) .map(createProxyURI(prefix, _, path, request.getQueryString)) .filter(uri => uri != null && validateDestination(uri.getHost, uri.getPort)) @@ -467,8 +468,10 @@ private[spark] object JettyUtils extends Logging { targetUri: URI): String = { val toReplace = targetUri.getScheme() + "://" + targetUri.getAuthority() if (headerValue.startsWith(toReplace)) { - clientRequest.getScheme() + "://" + clientRequest.getHeader("host") + - clientRequest.getPathInfo() + headerValue.substring(toReplace.length()) + val id = clientRequest.getPathInfo.substring("/proxy/".length).takeWhile(_ != '/') + val headerPath = headerValue.substring(toReplace.length) + + s"${clientRequest.getScheme}://${clientRequest.getHeader("host")}/proxy/$id$headerPath" } else { null } diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 0428903bb4d8bd9c7480c9a614d67c11c1a0620b..36ea3799afdf2feb0ae65558222ba82c12895296 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -223,7 +223,7 @@ class UISuite extends SparkFunSuite { val targetUri = URI.create("http://localhost:4040") when(clientRequest.getScheme()).thenReturn("http") when(clientRequest.getHeader("host")).thenReturn("localhost:8080") - when(clientRequest.getPathInfo()).thenReturn("/proxy/worker-id") + when(clientRequest.getPathInfo()).thenReturn("/proxy/worker-id/jobs") var newHeader = JettyUtils.createProxyLocationHeader(headerValue, clientRequest, targetUri) assert(newHeader.toString() === "http://localhost:8080/proxy/worker-id/jobs") headerValue = "http://localhost:4041/jobs"