Skip to content
Snippets Groups Projects
Commit 8ca1cc17 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Adding truncation for log files

parent 9a42d04e
No related branches found
No related tags found
No related merge requests found
...@@ -603,6 +603,20 @@ private object Utils extends Logging { ...@@ -603,6 +603,20 @@ private object Utils extends Logging {
portBound portBound
} }
/** Return a string containing the last `n` bytes of a file. */
def lastNBytes(path: String, n: Int): String = {
val file = new File(path)
val length = file.length()
val buff = new Array[Byte](math.min(n, length.toInt))
val skip = math.max(0, length - n)
val stream = new FileInputStream(file)
stream.skip(skip)
stream.read(buff)
stream.close()
Source.fromBytes(buff).mkString
}
/** /**
* Clone an object using a Spark serializer. * Clone an object using a Spark serializer.
*/ */
......
...@@ -3,14 +3,12 @@ package spark.deploy.worker.ui ...@@ -3,14 +3,12 @@ package spark.deploy.worker.ui
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.util.{Duration, Timeout} import akka.util.{Duration, Timeout}
import java.io.File import java.io.{FileInputStream, File}
import javax.servlet.http.HttpServletRequest import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.{Handler, Server} import org.eclipse.jetty.server.{Handler, Server}
import scala.io.Source
import spark.{Utils, Logging} import spark.{Utils, Logging}
import spark.ui.JettyUtils import spark.ui.JettyUtils
import spark.ui.JettyUtils._ import spark.ui.JettyUtils._
...@@ -56,11 +54,15 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option ...@@ -56,11 +54,15 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option
val appId = request.getParameter("appId") val appId = request.getParameter("appId")
val executorId = request.getParameter("executorId") val executorId = request.getParameter("executorId")
val logType = request.getParameter("logType") val logType = request.getParameter("logType")
val maxBytes = 1024 * 1024 // Guard against OOM
val defaultBytes = 100 * 1024
val numBytes = Option(request.getParameter("numBytes"))
.flatMap(s => Some(s.toInt)).getOrElse(defaultBytes)
val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType) val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
val source = Source.fromFile(path) val pre = "==== Last %s bytes of %s/%s/%s ====\n".format(numBytes, appId, executorId, logType)
val lines = source.mkString pre + Utils.lastNBytes(path, math.min(numBytes, maxBytes))
source.close()
lines
} }
def stop() { def stop() {
......
...@@ -5,6 +5,10 @@ import org.eclipse.jetty.server.Server ...@@ -5,6 +5,10 @@ import org.eclipse.jetty.server.Server
import java.net.ServerSocket import java.net.ServerSocket
import scala.util.{Failure, Success, Try} import scala.util.{Failure, Success, Try}
import spark.Utils import spark.Utils
import com.google.common.io.Files
import org.apache.commons.io.FileUtils
import java.io.{FileOutputStream, File}
import com.google.common.base.Charsets
class UISuite extends FunSuite { class UISuite extends FunSuite {
test("jetty port increases under contention") { test("jetty port increases under contention") {
...@@ -43,4 +47,31 @@ class UISuite extends FunSuite { ...@@ -43,4 +47,31 @@ class UISuite extends FunSuite {
assert(str(10 * hour + minute + 4 * second) === "10.02 h") assert(str(10 * hour + minute + 4 * second) === "10.02 h")
assert(str(10 * hour + 59 * minute + 59 * second + 999) === "11.00 h") assert(str(10 * hour + 59 * minute + 59 * second + 999) === "11.00 h")
} }
test("reading last n bytes of a file") {
val tmpDir = Files.createTempDir()
// File smaller than limit
val f1Path = tmpDir + "/f1"
val f1 = new FileOutputStream(f1Path)
f1.write("a\nb\nc\nd".getBytes(Charsets.UTF_8))
f1.close()
assert(Utils.lastNBytes(f1Path, 1024) === "a\nb\nc\nd")
// File larger than limit
val f2Path = tmpDir + "/f2"
val f2 = new FileOutputStream(f2Path)
f2.write("1\n2\n3\n4\n5\n6\n7\n8\n".getBytes(Charsets.UTF_8))
f2.close()
assert(Utils.lastNBytes(f2Path, 8) === "5\n6\n7\n8\n")
// Request limit too
val f3Path = tmpDir + "/f2"
val f3 = new FileOutputStream(f3Path)
f3.write("1\n2\n3\n4\n5\n6\n7\n8\n".getBytes(Charsets.UTF_8))
f3.close()
assert(Utils.lastNBytes(f3Path, 8) === "5\n6\n7\n8\n")
FileUtils.deleteDirectory(tmpDir)
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment