diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index d2bf151cbf6bc34c2e2da09bd22c17e3101b6f38..c6a3f97872b57a7e96cf3c29ddbb62a61b680eca 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -627,15 +627,16 @@ private object Utils extends Logging { callSiteInfo.firstUserLine) } - /** Return a string containing the last `n` bytes of a file. */ - def lastNBytes(path: String, n: Int): String = { + /** Return a string containing part of a file from byte 'start' to 'end'. */ + def offsetBytes(path: String, start: Long, end: Long): String = { val file = new File(path) val length = file.length() - val buff = new Array[Byte](math.min(n, length.toInt)) - val skip = math.max(0, length - n) + val effectiveEnd = math.min(length, end) + val effectiveStart = math.max(0, start) + val buff = new Array[Byte]((effectiveEnd-effectiveStart).toInt) val stream = new FileInputStream(file) - stream.skip(skip) + stream.skip(effectiveStart) stream.read(buff) stream.close() Source.fromBytes(buff).mkString diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index dbdc8e1057a172f7a1661ce89097dd8c3428d322..4dd6c448a97523ad2e3c712d7440893248af0ad8 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -71,7 +71,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } else { addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) context.watch(sender) // This doesn't work with remote actors but helps for testing - sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort) + sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get) schedule() } } diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala index 33a16b5d849962853578717bb26019b3f80e2dcd..8553377d8f5bb76398e814df8612fd4f25d8b572 100644 --- a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala @@ -90,9 +90,9 @@ private[spark] class ApplicationPage(parent: MasterWebUI) { <td>{executor.memory}</td> <td>{executor.state}</td> <td> - <a href={"%s/log?appId=%s&executorId=%s&logType=stdout" + <a href={"%s/logPage?appId=%s&executorId=%s&logType=stdout" .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a> - <a href={"%s/log?appId=%s&executorId=%s&logType=stderr" + <a href={"%s/logPage?appId=%s&executorId=%s&logType=stderr" .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stderr</a> </td> </tr> diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 6ae1cef94023a16ba4270e13f4443e85f2478002..f20ea42d7ffa612bfa261530619e7df38bf26384 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -77,7 +77,7 @@ private[spark] class Worker( sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse(".")) logInfo("Spark home: " + sparkHome) createWorkDir() - webUi = new WorkerWebUI(self, workDir, Some(webUiPort)) + webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) webUi.start() connectToMaster() } diff --git a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala index e466129c1aea513a2bb4db8c9d90e8b5ea9ef44c..c515f2e238213c1af7e799380dbd7313a7e3e0f7 100644 --- a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala +++ b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala @@ -16,17 +16,18 @@ import spark.Utils import spark.ui.UIUtils private[spark] class IndexPage(parent: WorkerWebUI) { + val workerActor = parent.worker.self val worker = parent.worker val timeout = parent.timeout def renderJson(request: HttpServletRequest): JValue = { - val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState] + val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerState] val workerState = Await.result(stateFuture, 30 seconds) JsonProtocol.writeWorkerState(workerState) } def render(request: HttpServletRequest): Seq[Node] = { - val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState] + val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerState] val workerState = Await.result(stateFuture, 30 seconds) val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs") @@ -88,11 +89,11 @@ private[spark] class IndexPage(parent: WorkerWebUI) { </ul> </td> <td> - <a href={"log?appId=%s&executorId=%s&logType=stdout" + <a href={"logPage?appId=%s&executorId=%s&logType=stdout" .format(executor.appId, executor.execId)}>stdout</a> - <a href={"log?appId=%s&executorId=%s&logType=stderr" + <a href={"logPage?appId=%s&executorId=%s&logType=stderr" .format(executor.appId, executor.execId)}>stderr</a> - </td> + </td> </tr> } diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala index 16564d56197c63d074c17b4f37c90ba64f8a4c90..ccd55c1ce46d192b61519ae7b4aff5befab72c6d 100644 --- a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala @@ -9,15 +9,17 @@ import javax.servlet.http.HttpServletRequest import org.eclipse.jetty.server.{Handler, Server} +import spark.deploy.worker.Worker import spark.{Utils, Logging} import spark.ui.JettyUtils import spark.ui.JettyUtils._ +import spark.ui.UIUtils /** * Web UI server for the standalone worker. */ private[spark] -class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option[Int] = None) +class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None) extends Logging { implicit val timeout = Timeout( Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")) @@ -33,6 +35,7 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option val handlers = Array[(String, Handler)]( ("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)), ("/log", (request: HttpServletRequest) => log(request)), + ("/logPage", (request: HttpServletRequest) => logPage(request)), ("/json", (request: HttpServletRequest) => indexPage.renderJson(request)), ("*", (request: HttpServletRequest) => indexPage.render(request)) ) @@ -51,18 +54,104 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option } def log(request: HttpServletRequest): String = { + val defaultBytes = 100 * 1024 val appId = request.getParameter("appId") val executorId = request.getParameter("executorId") val logType = request.getParameter("logType") + val offset = Option(request.getParameter("offset")).map(_.toLong) + val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes) + val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType) - val maxBytes = 1024 * 1024 // Guard against OOM - val defaultBytes = 100 * 1024 - val numBytes = Option(request.getParameter("numBytes")) - .flatMap(s => Some(s.toInt)).getOrElse(defaultBytes) + val (startByte, endByte) = getByteRange(path, offset, byteLength) + val file = new File(path) + val logLength = file.length + val pre = "==== Bytes %s-%s of %s of %s/%s/%s ====\n" + .format(startByte, endByte, logLength, appId, executorId, logType) + pre + Utils.offsetBytes(path, startByte, endByte) + } + + def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = { + val defaultBytes = 100 * 1024 + val appId = request.getParameter("appId") + val executorId = request.getParameter("executorId") + val logType = request.getParameter("logType") + val offset = Option(request.getParameter("offset")).map(_.toLong) + val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes) val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType) - val pre = "==== Last %s bytes of %s/%s/%s ====\n".format(numBytes, appId, executorId, logType) - pre + Utils.lastNBytes(path, math.min(numBytes, maxBytes)) + + val (startByte, endByte) = getByteRange(path, offset, byteLength) + val file = new File(path) + val logLength = file.length + + val logText = <node>{Utils.offsetBytes(path, startByte, endByte)}</node> + + val linkToMaster = <p><a href={worker.masterWebUiUrl}>Back to Master</a></p> + + val range = <span>Bytes {startByte.toString} - {endByte.toString} of {logLength}</span> + + val backButton = + if (startByte > 0) { + <a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s" + .format(appId, executorId, logType, math.max(startByte-byteLength, 0), + byteLength)}> + <button>Previous {Utils.memoryBytesToString(math.min(byteLength, startByte))}</button> + </a> + } + else { + <button disabled="disabled">Previous 0 B</button> + } + + val nextButton = + if (endByte < logLength) { + <a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s". + format(appId, executorId, logType, endByte, byteLength)}> + <button>Next {Utils.memoryBytesToString(math.min(byteLength, logLength-endByte))}</button> + </a> + } + else { + <button disabled="disabled">Next 0 B</button> + } + + val content = + <html> + <body> + {linkToMaster} + <hr /> + <div> + <div style="float:left;width:40%">{backButton}</div> + <div style="float:left;">{range}</div> + <div style="float:right;">{nextButton}</div> + </div> + <br /> + <div style="height:500px;overflow:auto;padding:5px;"> + <pre>{logText}</pre> + </div> + </body> + </html> + UIUtils.basicSparkPage(content, logType + " log page for " + appId) + } + + /** Determine the byte range for a log or log page. */ + def getByteRange(path: String, offset: Option[Long], byteLength: Int) + : (Long, Long) = { + val defaultBytes = 100 * 1024 + val maxBytes = 1024 * 1024 + + val file = new File(path) + val logLength = file.length() + val getOffset = offset.getOrElse(logLength-defaultBytes) + + val startByte = + if (getOffset < 0) 0L + else if (getOffset > logLength) logLength + else getOffset + + val logPageLength = math.min(byteLength, maxBytes) + + val endByte = math.min(startByte+logPageLength, logLength) + + (startByte, endByte) } def stop() { diff --git a/core/src/test/scala/spark/UtilsSuite.scala b/core/src/test/scala/spark/UtilsSuite.scala index 4a113e16bf5a58d72b724760b403bba0f97a92e6..1e1260f6060c7af03d1b6d5aca9fd7553123bb9f 100644 --- a/core/src/test/scala/spark/UtilsSuite.scala +++ b/core/src/test/scala/spark/UtilsSuite.scala @@ -1,7 +1,10 @@ package spark +import com.google.common.base.Charsets +import com.google.common.io.Files +import java.io.{ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream, File} import org.scalatest.FunSuite -import java.io.{ByteArrayOutputStream, ByteArrayInputStream} +import org.apache.commons.io.FileUtils import scala.util.Random class UtilsSuite extends FunSuite { @@ -71,5 +74,49 @@ class UtilsSuite extends FunSuite { assert(Utils.splitCommandString("''") === Seq("")) assert(Utils.splitCommandString("\"\"") === Seq("")) } + + test("string formatting of time durations") { + val second = 1000 + val minute = second * 60 + val hour = minute * 60 + def str = Utils.msDurationToString(_) + + assert(str(123) === "123 ms") + assert(str(second) === "1.0 s") + assert(str(second + 462) === "1.5 s") + assert(str(hour) === "1.00 h") + assert(str(minute) === "1.0 m") + assert(str(minute + 4 * second + 34) === "1.1 m") + assert(str(10 * hour + minute + 4 * second) === "10.02 h") + assert(str(10 * hour + 59 * minute + 59 * second + 999) === "11.00 h") + } + + test("reading offset bytes of a file") { + val tmpDir2 = Files.createTempDir() + val f1Path = tmpDir2 + "/f1" + val f1 = new FileOutputStream(f1Path) + f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(Charsets.UTF_8)) + f1.close() + + // Read first few bytes + assert(Utils.offsetBytes(f1Path, 0, 5) === "1\n2\n3") + + // Read some middle bytes + assert(Utils.offsetBytes(f1Path, 4, 11) === "3\n4\n5\n6") + + // Read last few bytes + assert(Utils.offsetBytes(f1Path, 12, 18) === "7\n8\n9\n") + + // Read some nonexistent bytes in the beginning + assert(Utils.offsetBytes(f1Path, -5, 5) === "1\n2\n3") + + // Read some nonexistent bytes at the end + assert(Utils.offsetBytes(f1Path, 12, 22) === "7\n8\n9\n") + + // Read some nonexistent bytes on both ends + assert(Utils.offsetBytes(f1Path, -3, 25) === "1\n2\n3\n4\n5\n6\n7\n8\n9\n") + + FileUtils.deleteDirectory(tmpDir2) + } } diff --git a/core/src/test/scala/spark/ui/UISuite.scala b/core/src/test/scala/spark/ui/UISuite.scala index e4bb3abc339ea25db7190406d208d41fb0c9cd69..fc0c1607202c0cd832d3ffe648f186d3081da684 100644 --- a/core/src/test/scala/spark/ui/UISuite.scala +++ b/core/src/test/scala/spark/ui/UISuite.scala @@ -1,14 +1,9 @@ package spark.ui +import scala.util.{Failure, Success, Try} +import java.net.ServerSocket import org.scalatest.FunSuite import org.eclipse.jetty.server.Server -import java.net.ServerSocket -import scala.util.{Failure, Success, Try} -import spark.Utils -import com.google.common.io.Files -import org.apache.commons.io.FileUtils -import java.io.{FileOutputStream, File} -import com.google.common.base.Charsets class UISuite extends FunSuite { test("jetty port increases under contention") { @@ -31,47 +26,4 @@ class UISuite extends FunSuite { case Failure (e) => } } - - test("string formatting of time durations") { - val second = 1000 - val minute = second * 60 - val hour = minute * 60 - def str = Utils.msDurationToString(_) - - assert(str(123) === "123 ms") - assert(str(second) === "1.0 s") - assert(str(second + 462) === "1.5 s") - assert(str(hour) === "1.00 h") - assert(str(minute) === "1.0 m") - assert(str(minute + 4 * second + 34) === "1.1 m") - assert(str(10 * hour + minute + 4 * second) === "10.02 h") - assert(str(10 * hour + 59 * minute + 59 * second + 999) === "11.00 h") - } - - test("reading last n bytes of a file") { - val tmpDir = Files.createTempDir() - - // File smaller than limit - val f1Path = tmpDir + "/f1" - val f1 = new FileOutputStream(f1Path) - f1.write("a\nb\nc\nd".getBytes(Charsets.UTF_8)) - f1.close() - assert(Utils.lastNBytes(f1Path, 1024) === "a\nb\nc\nd") - - // File larger than limit - val f2Path = tmpDir + "/f2" - val f2 = new FileOutputStream(f2Path) - f2.write("1\n2\n3\n4\n5\n6\n7\n8\n".getBytes(Charsets.UTF_8)) - f2.close() - assert(Utils.lastNBytes(f2Path, 8) === "5\n6\n7\n8\n") - - // Request limit too - val f3Path = tmpDir + "/f2" - val f3 = new FileOutputStream(f3Path) - f3.write("1\n2\n3\n4\n5\n6\n7\n8\n".getBytes(Charsets.UTF_8)) - f3.close() - assert(Utils.lastNBytes(f3Path, 8) === "5\n6\n7\n8\n") - - FileUtils.deleteDirectory(tmpDir) - } }