diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
index dc2bee6f2bdcad0e84d509b78b4c08901c9da96c..53f8f9a46cf8deca4e73f6e51788194d8a2a989f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.deploy.worker.ui
 
+import java.io.File
+import java.net.URI
 import javax.servlet.http.HttpServletRequest
 
 import scala.xml.Node
@@ -135,6 +137,13 @@ private[ui] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with
       return ("Error: Log type must be one of " + supportedLogTypes.mkString(", "), 0, 0, 0)
     }
 
+    // Verify that the normalized path of the log directory is in the working directory
+    val normalizedUri = new URI(logDirectory).normalize()
+    val normalizedLogDir = new File(normalizedUri.getPath)
+    if (!Utils.isInDirectory(workDir, normalizedLogDir)) {
+      return ("Error: invalid log directory " + logDirectory, 0, 0, 0)
+    }
+
     try {
       val files = RollingFileAppender.getSortedRolledOverFiles(logDirectory, logType)
       logDebug(s"Sorted log files of type $logType in $logDirectory:\n${files.mkString("\n")}")
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 693e1a0a3d5f097f20e524e1f3f65f8a26729977..5f132410540fd394af360f5096eb76b0533ee159 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2227,6 +2227,22 @@ private[spark] object Utils extends Logging {
     }
   }
 
+  /**
+   * Return whether the specified file is a parent directory of the child file.
+   */
+  def isInDirectory(parent: File, child: File): Boolean = {
+    if (child == null || parent == null) {
+      return false
+    }
+    if (!child.exists() || !parent.exists() || !parent.isDirectory()) {
+      return false
+    }
+    if (parent.equals(child)) {
+      return true
+    }
+    isInDirectory(parent, child.getParentFile)
+  }
+
 }
 
 private [util] class SparkShutdownHookManager {
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ui/LogPageSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ui/LogPageSuite.scala
index 572360ddb95d4b4f5b0fd66a643338320c7b7a35..72eaffb41698141d2907e78e4e9a10fbe5ed5bfe 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ui/LogPageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ui/LogPageSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.worker.ui
 
 import java.io.{File, FileWriter}
 
-import org.mockito.Mockito.mock
+import org.mockito.Mockito.{mock, when}
 import org.scalatest.PrivateMethodTester
 
 import org.apache.spark.SparkFunSuite
@@ -28,33 +28,47 @@ class LogPageSuite extends SparkFunSuite with PrivateMethodTester {
 
   test("get logs simple") {
     val webui = mock(classOf[WorkerWebUI])
+    val tmpDir = new File(sys.props("java.io.tmpdir"))
+    val workDir = new File(tmpDir, "work-dir")
+    workDir.mkdir()
+    when(webui.workDir).thenReturn(workDir)
     val logPage = new LogPage(webui)
 
     // Prepare some fake log files to read later
     val out = "some stdout here"
     val err = "some stderr here"
-    val tmpDir = new File(sys.props("java.io.tmpdir"))
-    val tmpOut = new File(tmpDir, "stdout")
-    val tmpErr = new File(tmpDir, "stderr")
-    val tmpRand = new File(tmpDir, "random")
+    val tmpOut = new File(workDir, "stdout")
+    val tmpErr = new File(workDir, "stderr")
+    val tmpErrBad = new File(tmpDir, "stderr") // outside the working directory
+    val tmpOutBad = new File(tmpDir, "stdout")
+    val tmpRand = new File(workDir, "random")
     write(tmpOut, out)
     write(tmpErr, err)
+    write(tmpOutBad, out)
+    write(tmpErrBad, err)
     write(tmpRand, "1 6 4 5 2 7 8")
 
     // Get the logs. All log types other than "stderr" or "stdout" will be rejected
     val getLog = PrivateMethod[(String, Long, Long, Long)]('getLog)
     val (stdout, _, _, _) =
-      logPage invokePrivate getLog(tmpDir.getAbsolutePath, "stdout", None, 100)
+      logPage invokePrivate getLog(workDir.getAbsolutePath, "stdout", None, 100)
     val (stderr, _, _, _) =
-      logPage invokePrivate getLog(tmpDir.getAbsolutePath, "stderr", None, 100)
+      logPage invokePrivate getLog(workDir.getAbsolutePath, "stderr", None, 100)
     val (error1, _, _, _) =
-      logPage invokePrivate getLog(tmpDir.getAbsolutePath, "random", None, 100)
+      logPage invokePrivate getLog(workDir.getAbsolutePath, "random", None, 100)
     val (error2, _, _, _) =
-      logPage invokePrivate getLog(tmpDir.getAbsolutePath, "does-not-exist.txt", None, 100)
+      logPage invokePrivate getLog(workDir.getAbsolutePath, "does-not-exist.txt", None, 100)
+    // These files exist, but live outside the working directory
+    val (error3, _, _, _) =
+      logPage invokePrivate getLog(tmpDir.getAbsolutePath, "stderr", None, 100)
+    val (error4, _, _, _) =
+      logPage invokePrivate getLog(tmpDir.getAbsolutePath, "stdout", None, 100)
     assert(stdout === out)
     assert(stderr === err)
-    assert(error1.startsWith("Error"))
-    assert(error2.startsWith("Error"))
+    assert(error1.startsWith("Error: Log type must be one of "))
+    assert(error2.startsWith("Error: Log type must be one of "))
+    assert(error3.startsWith("Error: invalid log directory"))
+    assert(error4.startsWith("Error: invalid log directory"))
   }
 
   /** Write the specified string to the file. */
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index a867cf83dc3f179407fb4bb2a8459b60eb474c8f..a61ea3918f46ad2e98daf25254f9f596dfb96594 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -608,4 +608,69 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
     manager.runAll()
     assert(output.toList === List(4, 3, 2))
   }
+
+  test("isInDirectory") {
+    val tmpDir = new File(sys.props("java.io.tmpdir"))
+    val parentDir = new File(tmpDir, "parent-dir")
+    val childDir1 = new File(parentDir, "child-dir-1")
+    val childDir1b = new File(parentDir, "child-dir-1b")
+    val childFile1 = new File(parentDir, "child-file-1.txt")
+    val childDir2 = new File(childDir1, "child-dir-2")
+    val childDir2b = new File(childDir1, "child-dir-2b")
+    val childFile2 = new File(childDir1, "child-file-2.txt")
+    val childFile3 = new File(childDir2, "child-file-3.txt")
+    val nullFile: File = null
+    parentDir.mkdir()
+    childDir1.mkdir()
+    childDir1b.mkdir()
+    childDir2.mkdir()
+    childDir2b.mkdir()
+    childFile1.createNewFile()
+    childFile2.createNewFile()
+    childFile3.createNewFile()
+
+    // Identity
+    assert(Utils.isInDirectory(parentDir, parentDir))
+    assert(Utils.isInDirectory(childDir1, childDir1))
+    assert(Utils.isInDirectory(childDir2, childDir2))
+
+    // Valid ancestor-descendant pairs
+    assert(Utils.isInDirectory(parentDir, childDir1))
+    assert(Utils.isInDirectory(parentDir, childFile1))
+    assert(Utils.isInDirectory(parentDir, childDir2))
+    assert(Utils.isInDirectory(parentDir, childFile2))
+    assert(Utils.isInDirectory(parentDir, childFile3))
+    assert(Utils.isInDirectory(childDir1, childDir2))
+    assert(Utils.isInDirectory(childDir1, childFile2))
+    assert(Utils.isInDirectory(childDir1, childFile3))
+    assert(Utils.isInDirectory(childDir2, childFile3))
+
+    // Inverted ancestor-descendant pairs should fail
+    assert(!Utils.isInDirectory(childDir1, parentDir))
+    assert(!Utils.isInDirectory(childDir2, parentDir))
+    assert(!Utils.isInDirectory(childDir2, childDir1))
+    assert(!Utils.isInDirectory(childFile1, parentDir))
+    assert(!Utils.isInDirectory(childFile2, parentDir))
+    assert(!Utils.isInDirectory(childFile3, parentDir))
+    assert(!Utils.isInDirectory(childFile2, childDir1))
+    assert(!Utils.isInDirectory(childFile3, childDir1))
+    assert(!Utils.isInDirectory(childFile3, childDir2))
+
+    // Non-existent files or directories should fail
+    assert(!Utils.isInDirectory(parentDir, new File(parentDir, "one.txt")))
+    assert(!Utils.isInDirectory(parentDir, new File(parentDir, "one/two.txt")))
+    assert(!Utils.isInDirectory(parentDir, new File(parentDir, "one/two/three.txt")))
+
+    // Siblings should fail
+    assert(!Utils.isInDirectory(childDir1, childDir1b))
+    assert(!Utils.isInDirectory(childDir1, childFile1))
+    assert(!Utils.isInDirectory(childDir2, childDir2b))
+    assert(!Utils.isInDirectory(childDir2, childFile2))
+
+    // Null files should fail without throwing NPE
+    assert(!Utils.isInDirectory(parentDir, nullFile))
+    assert(!Utils.isInDirectory(childFile3, nullFile))
+    assert(!Utils.isInDirectory(nullFile, parentDir))
+    assert(!Utils.isInDirectory(nullFile, childFile3))
+  }
 }