diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index af926b1e36f4dcaa5e07deba6521f75a67517319..c1616de6419baf0f2cc0766b786de3b20600d758 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -603,6 +603,20 @@ private object Utils extends Logging { portBound } + /** Return a string containing the last `n` bytes of a file. */ + def lastNBytes(path: String, n: Int): String = { + val file = new File(path) + val length = file.length() + val buff = new Array[Byte](math.min(n, length.toInt)) + val skip = math.max(0, length - n) + val stream = new FileInputStream(file) + + stream.skip(skip) + stream.read(buff) + stream.close() + Source.fromBytes(buff).mkString + } + /** * Clone an object using a Spark serializer. */ diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala index 2e6279566c50717ab2150dd8f2df77624bb55824..16564d56197c63d074c17b4f37c90ba64f8a4c90 100644 --- a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala @@ -3,14 +3,12 @@ package spark.deploy.worker.ui import akka.actor.ActorRef import akka.util.{Duration, Timeout} -import java.io.File +import java.io.{FileInputStream, File} import javax.servlet.http.HttpServletRequest import org.eclipse.jetty.server.{Handler, Server} -import scala.io.Source - import spark.{Utils, Logging} import spark.ui.JettyUtils import spark.ui.JettyUtils._ @@ -56,11 +54,15 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option val appId = request.getParameter("appId") val executorId = request.getParameter("executorId") val logType = request.getParameter("logType") + + val maxBytes = 1024 * 1024 // Guard against OOM + val defaultBytes = 100 * 1024 + val numBytes = Option(request.getParameter("numBytes")) + .flatMap(s => Some(s.toInt)).getOrElse(defaultBytes) + val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType) - val source = Source.fromFile(path) - val lines = source.mkString - source.close() - lines + val pre = "==== Last %s bytes of %s/%s/%s ====\n".format(numBytes, appId, executorId, logType) + pre + Utils.lastNBytes(path, math.min(numBytes, maxBytes)) } def stop() { diff --git a/core/src/test/scala/spark/ui/UISuite.scala b/core/src/test/scala/spark/ui/UISuite.scala index aa191f628b9bdeb750f7d60b5965d9d9b7ef79f0..e4bb3abc339ea25db7190406d208d41fb0c9cd69 100644 --- a/core/src/test/scala/spark/ui/UISuite.scala +++ b/core/src/test/scala/spark/ui/UISuite.scala @@ -5,6 +5,10 @@ import org.eclipse.jetty.server.Server import java.net.ServerSocket import scala.util.{Failure, Success, Try} import spark.Utils +import com.google.common.io.Files +import org.apache.commons.io.FileUtils +import java.io.{FileOutputStream, File} +import com.google.common.base.Charsets class UISuite extends FunSuite { test("jetty port increases under contention") { @@ -43,4 +47,31 @@ class UISuite extends FunSuite { assert(str(10 * hour + minute + 4 * second) === "10.02 h") assert(str(10 * hour + 59 * minute + 59 * second + 999) === "11.00 h") } + + test("reading last n bytes of a file") { + val tmpDir = Files.createTempDir() + + // File smaller than limit + val f1Path = tmpDir + "/f1" + val f1 = new FileOutputStream(f1Path) + f1.write("a\nb\nc\nd".getBytes(Charsets.UTF_8)) + f1.close() + assert(Utils.lastNBytes(f1Path, 1024) === "a\nb\nc\nd") + + // File larger than limit + val f2Path = tmpDir + "/f2" + val f2 = new FileOutputStream(f2Path) + f2.write("1\n2\n3\n4\n5\n6\n7\n8\n".getBytes(Charsets.UTF_8)) + f2.close() + assert(Utils.lastNBytes(f2Path, 8) === "5\n6\n7\n8\n") + + // Request limit too + val f3Path = tmpDir + "/f2" + val f3 = new FileOutputStream(f3Path) + f3.write("1\n2\n3\n4\n5\n6\n7\n8\n".getBytes(Charsets.UTF_8)) + f3.close() + assert(Utils.lastNBytes(f3Path, 8) === "5\n6\n7\n8\n") + + FileUtils.deleteDirectory(tmpDir) + } }