diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index d27e0e1f15c654e924b79ef6353a3701d0323a53..d09136de49807d51081dd455f0da5e7b0bace641 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -23,9 +23,10 @@ import akka.actor.ActorRef
 import com.google.common.base.Charsets
 import com.google.common.io.Files
 
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
 import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
 import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
+import org.apache.spark.util.logging.FileAppender
 
 /**
  * Manages the execution of one executor process.
@@ -42,12 +43,15 @@ private[spark] class ExecutorRunner(
     val sparkHome: File,
     val workDir: File,
     val workerUrl: String,
+    val conf: SparkConf,
     var state: ExecutorState.Value)
   extends Logging {
 
   val fullId = appId + "/" + execId
   var workerThread: Thread = null
   var process: Process = null
+  var stdoutAppender: FileAppender = null
+  var stderrAppender: FileAppender = null
 
   // NOTE: This is now redundant with the automated shut-down enforced by the Executor. It might
   // make sense to remove this in the future.
@@ -76,6 +80,13 @@ private[spark] class ExecutorRunner(
     if (process != null) {
       logInfo("Killing process!")
       process.destroy()
+      process.waitFor()
+      if (stdoutAppender != null) {
+        stdoutAppender.stop()
+      }
+      if (stderrAppender != null) {
+        stderrAppender.stop()
+      }
       val exitCode = process.waitFor()
       worker ! ExecutorStateChanged(appId, execId, state, message, Some(exitCode))
     }
@@ -137,11 +148,11 @@ private[spark] class ExecutorRunner(
 
       // Redirect its stdout and stderr to files
       val stdout = new File(executorDir, "stdout")
-      CommandUtils.redirectStream(process.getInputStream, stdout)
+      stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
 
       val stderr = new File(executorDir, "stderr")
       Files.write(header, stderr, Charsets.UTF_8)
-      CommandUtils.redirectStream(process.getErrorStream, stderr)
+      stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
 
       // Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
       // long-lived processes only. However, in the future, we might restart the executor a few
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 100de26170a50fab0427444687b34341b2e2d9a0..a0ecaf709f8e2b804c7318da69d0819543256193 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -235,7 +235,7 @@ private[spark] class Worker(
           val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
             self, workerId, host,
             appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
-            workDir, akkaUrl, ExecutorState.RUNNING)
+            workDir, akkaUrl, conf, ExecutorState.RUNNING)
           executors(appId + "/" + execId) = manager
           manager.start()
           coresUsed += cores_
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
index 8381f59672ea3531f499b7098c589c3e1cc70966..6a5ffb1b71bfb4f98263d649019933c470a968e2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
@@ -24,8 +24,10 @@ import scala.xml.Node
 
 import org.apache.spark.ui.{WebUIPage, UIUtils}
 import org.apache.spark.util.Utils
+import org.apache.spark.Logging
+import org.apache.spark.util.logging.{FileAppender, RollingFileAppender}
 
-private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
+private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with Logging {
   private val worker = parent.worker
   private val workDir = parent.workDir
 
@@ -39,21 +41,18 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
     val offset = Option(request.getParameter("offset")).map(_.toLong)
     val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
 
-    val path = (appId, executorId, driverId) match {
+    val logDir = (appId, executorId, driverId) match {
       case (Some(a), Some(e), None) =>
-        s"${workDir.getPath}/$appId/$executorId/$logType"
+        s"${workDir.getPath}/$appId/$executorId/"
       case (None, None, Some(d)) =>
-        s"${workDir.getPath}/$driverId/$logType"
+        s"${workDir.getPath}/$driverId/"
       case _ =>
         throw new Exception("Request must specify either application or driver identifiers")
     }
 
-    val (startByte, endByte) = getByteRange(path, offset, byteLength)
-    val file = new File(path)
-    val logLength = file.length
-
-    val pre = s"==== Bytes $startByte-$endByte of $logLength of $path ====\n"
-    pre + Utils.offsetBytes(path, startByte, endByte)
+    val (logText, startByte, endByte, logLength) = getLog(logDir, logType, offset, byteLength)
+    val pre = s"==== Bytes $startByte-$endByte of $logLength of $logDir$logType ====\n"
+    pre + logText
   }
 
   def render(request: HttpServletRequest): Seq[Node] = {
@@ -65,19 +64,16 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
     val offset = Option(request.getParameter("offset")).map(_.toLong)
     val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
 
-    val (path, params) = (appId, executorId, driverId) match {
+    val (logDir, params) = (appId, executorId, driverId) match {
       case (Some(a), Some(e), None) =>
-        (s"${workDir.getPath}/$a/$e/$logType", s"appId=$a&executorId=$e")
+        (s"${workDir.getPath}/$a/$e/", s"appId=$a&executorId=$e")
       case (None, None, Some(d)) =>
-        (s"${workDir.getPath}/$d/$logType", s"driverId=$d")
+        (s"${workDir.getPath}/$d/", s"driverId=$d")
       case _ =>
         throw new Exception("Request must specify either application or driver identifiers")
     }
 
-    val (startByte, endByte) = getByteRange(path, offset, byteLength)
-    val file = new File(path)
-    val logLength = file.length
-    val logText = <node>{Utils.offsetBytes(path, startByte, endByte)}</node>
+    val (logText, startByte, endByte, logLength) = getLog(logDir, logType, offset, byteLength)
     val linkToMaster = <p><a href={worker.activeMasterWebUiUrl}>Back to Master</a></p>
     val range = <span>Bytes {startByte.toString} - {endByte.toString} of {logLength}</span>
 
@@ -127,23 +123,37 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
     UIUtils.basicSparkPage(content, logType + " log page for " + appId)
   }
 
-  /** Determine the byte range for a log or log page. */
-  private def getByteRange(path: String, offset: Option[Long], byteLength: Int): (Long, Long) = {
-    val defaultBytes = 100 * 1024
-    val maxBytes = 1024 * 1024
-    val file = new File(path)
-    val logLength = file.length()
-    val getOffset = offset.getOrElse(logLength - defaultBytes)
-    val startByte =
-      if (getOffset < 0) {
-        0L
-      } else if (getOffset > logLength) {
-        logLength
-      } else {
-        getOffset
+  /** Get the part of the log files given the offset and desired length of bytes */
+  private def getLog(
+      logDirectory: String,
+      logType: String,
+      offsetOption: Option[Long],
+      byteLength: Int
+    ): (String, Long, Long, Long) = {
+    try {
+      val files = RollingFileAppender.getSortedRolledOverFiles(logDirectory, logType)
+      logDebug(s"Sorted log files of type $logType in $logDirectory:\n${files.mkString("\n")}")
+
+      val totalLength = files.map { _.length }.sum
+      val offset = offsetOption.getOrElse(totalLength - byteLength)
+      val startIndex = {
+        if (offset < 0) {
+          0L
+        } else if (offset > totalLength) {
+          totalLength
+        } else {
+          offset
+        }
       }
-    val logPageLength = math.min(byteLength, maxBytes)
-    val endByte = math.min(startByte + logPageLength, logLength)
-    (startByte, endByte)
+      val endIndex = math.min(startIndex + totalLength, totalLength)
+      logDebug(s"Getting log from $startIndex to $endIndex")
+      val logText = Utils.offsetBytes(files, startIndex, endIndex)
+      logDebug(s"Got log of length ${logText.length} bytes")
+      (logText, startIndex, endIndex, totalLength)
+    } catch {
+      case e: Exception =>
+        logError(s"Error getting $logType logs from directory $logDirectory", e)
+        ("Error getting logs due to exception: " + e.getMessage, 0, 0, 0)
+    }
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 3b1b6df089b8efe5807503c2fb4f573f8fe3f2f9..4ce28bb0cf059b04a52c65197d4ac1e60b0fe1eb 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -862,6 +862,59 @@ private[spark] object Utils extends Logging {
     Source.fromBytes(buff).mkString
   }
 
+  /**
+   * Return a string containing data across a set of files. The `startIndex`
+   * and `endIndex` is based on the cumulative size of all the files take in
+   * the given order. See figure below for more details.
+   */
+  def offsetBytes(files: Seq[File], start: Long, end: Long): String = {
+    val fileLengths = files.map { _.length }
+    val startIndex = math.max(start, 0)
+    val endIndex = math.min(end, fileLengths.sum)
+    val fileToLength = files.zip(fileLengths).toMap
+    logDebug("Log files: \n" + fileToLength.mkString("\n"))
+
+    val stringBuffer = new StringBuffer((endIndex - startIndex).toInt)
+    var sum = 0L
+    for (file <- files) {
+      val startIndexOfFile = sum
+      val endIndexOfFile = sum + fileToLength(file)
+      logDebug(s"Processing file $file, " +
+        s"with start index = $startIndexOfFile, end index = $endIndex")
+
+      /*
+                                      ____________
+       range 1:                      |            |
+                                     |   case A   |
+
+       files:   |==== file 1 ====|====== file 2 ======|===== file 3 =====|
+
+                     |   case B  .       case C       .    case D    |
+       range 2:      |___________.____________________.______________|
+       */
+
+      if (startIndex <= startIndexOfFile  && endIndex >= endIndexOfFile) {
+        // Case C: read the whole file
+        stringBuffer.append(offsetBytes(file.getAbsolutePath, 0, fileToLength(file)))
+      } else if (startIndex > startIndexOfFile && startIndex < endIndexOfFile) {
+        // Case A and B: read from [start of required range] to [end of file / end of range]
+        val effectiveStartIndex = startIndex - startIndexOfFile
+        val effectiveEndIndex = math.min(endIndex - startIndexOfFile, fileToLength(file))
+        stringBuffer.append(Utils.offsetBytes(
+          file.getAbsolutePath, effectiveStartIndex, effectiveEndIndex))
+      } else if (endIndex > startIndexOfFile && endIndex < endIndexOfFile) {
+        // Case D: read from [start of file] to [end of require range]
+        val effectiveStartIndex = math.max(startIndex - startIndexOfFile, 0)
+        val effectiveEndIndex = endIndex - startIndexOfFile
+        stringBuffer.append(Utils.offsetBytes(
+          file.getAbsolutePath, effectiveStartIndex, effectiveEndIndex))
+      }
+      sum += fileToLength(file)
+      logDebug(s"After processing file $file, string built is ${stringBuffer.toString}}")
+    }
+    stringBuffer.toString
+  }
+
   /**
    * Clone an object using a Spark serializer.
    */
diff --git a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
new file mode 100644
index 0000000000000000000000000000000000000000..8e9c3036d09c27964fd8297e2c88ef22a2ee4927
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io.{File, FileOutputStream, InputStream}
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.util.{IntParam, Utils}
+
+/**
+ * Continuously appends the data from an input stream into the given file.
+ */
+private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSize: Int = 8192)
+  extends Logging {
+  @volatile private var outputStream: FileOutputStream = null
+  @volatile private var markedForStop = false     // has the appender been asked to stopped
+  @volatile private var stopped = false           // has the appender stopped
+
+  // Thread that reads the input stream and writes to file
+  private val writingThread = new Thread("File appending thread for " + file) {
+    setDaemon(true)
+    override def run() {
+      Utils.logUncaughtExceptions {
+        appendStreamToFile()
+      }
+    }
+  }
+  writingThread.start()
+
+  /**
+   * Wait for the appender to stop appending, either because input stream is closed
+   * or because of any error in appending
+   */
+  def awaitTermination() {
+    synchronized {
+      if (!stopped) {
+        wait()
+      }
+    }
+  }
+
+  /** Stop the appender */
+  def stop() {
+    markedForStop = true
+  }
+
+  /** Continuously read chunks from the input stream and append to the file */
+  protected def appendStreamToFile() {
+    try {
+      logDebug("Started appending thread")
+      openFile()
+      val buf = new Array[Byte](bufferSize)
+      var n = 0
+      while (!markedForStop && n != -1) {
+        n = inputStream.read(buf)
+        if (n != -1) {
+          appendToFile(buf, n)
+        }
+      }
+    } catch {
+      case e: Exception =>
+        logError(s"Error writing stream to file $file", e)
+    } finally {
+      closeFile()
+      synchronized {
+        stopped = true
+        notifyAll()
+      }
+    }
+  }
+
+  /** Append bytes to the file output stream */
+  protected def appendToFile(bytes: Array[Byte], len: Int) {
+    if (outputStream == null) {
+      openFile()
+    }
+    outputStream.write(bytes, 0, len)
+  }
+
+  /** Open the file output stream */
+  protected def openFile() {
+    outputStream = new FileOutputStream(file, false)
+    logDebug(s"Opened file $file")
+  }
+
+  /** Close the file output stream */
+  protected def closeFile() {
+    outputStream.flush()
+    outputStream.close()
+    logDebug(s"Closed file $file")
+  }
+}
+
+/**
+ * Companion object to [[org.apache.spark.util.logging.FileAppender]] which has helper
+ * functions to choose the correct type of FileAppender based on SparkConf configuration.
+ */
+private[spark] object FileAppender extends Logging {
+
+  /** Create the right appender based on Spark configuration */
+  def apply(inputStream: InputStream, file: File, conf: SparkConf): FileAppender = {
+
+    import RollingFileAppender._
+
+    val rollingStrategy = conf.get(STRATEGY_PROPERTY, STRATEGY_DEFAULT)
+    val rollingSizeBytes = conf.get(SIZE_PROPERTY, STRATEGY_DEFAULT)
+    val rollingInterval = conf.get(INTERVAL_PROPERTY, INTERVAL_DEFAULT)
+
+    def createTimeBasedAppender() = {
+      val validatedParams: Option[(Long, String)] = rollingInterval match {
+        case "daily" =>
+          logInfo(s"Rolling executor logs enabled for $file with daily rolling")
+          Some(24 * 60 * 60 * 1000L, "--YYYY-MM-dd")
+        case "hourly" =>
+          logInfo(s"Rolling executor logs enabled for $file with hourly rolling")
+          Some(60 * 60 * 1000L, "--YYYY-MM-dd--HH")
+        case "minutely" =>
+          logInfo(s"Rolling executor logs enabled for $file with rolling every minute")
+          Some(60 * 1000L, "--YYYY-MM-dd--HH-mm")
+        case IntParam(seconds) =>
+          logInfo(s"Rolling executor logs enabled for $file with rolling $seconds seconds")
+          Some(seconds * 1000L, "--YYYY-MM-dd--HH-mm-ss")
+        case _ =>
+          logWarning(s"Illegal interval for rolling executor logs [$rollingInterval], " +
+              s"rolling logs not enabled")
+          None
+      }
+      validatedParams.map {
+        case (interval, pattern) =>
+          new RollingFileAppender(
+            inputStream, file, new TimeBasedRollingPolicy(interval, pattern), conf)
+      }.getOrElse {
+        new FileAppender(inputStream, file)
+      }
+    }
+
+    def createSizeBasedAppender() = {
+      rollingSizeBytes match {
+        case IntParam(bytes) =>
+          logInfo(s"Rolling executor logs enabled for $file with rolling every $bytes bytes")
+          new RollingFileAppender(inputStream, file, new SizeBasedRollingPolicy(bytes), conf)
+        case _ =>
+          logWarning(
+            s"Illegal size [$rollingSizeBytes] for rolling executor logs, rolling logs not enabled")
+          new FileAppender(inputStream, file)
+      }
+    }
+
+    rollingStrategy match {
+      case "" =>
+        new FileAppender(inputStream, file)
+      case "time" =>
+        createTimeBasedAppender()
+      case "size" =>
+        createSizeBasedAppender()
+      case _ =>
+        logWarning(
+          s"Illegal strategy [$rollingStrategy] for rolling executor logs, " +
+            s"rolling logs not enabled")
+        new FileAppender(inputStream, file)
+    }
+  }
+}
+
+
diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
new file mode 100644
index 0000000000000000000000000000000000000000..1bbbd20cf076f3de87e7cd36bc7e133ed0fc07e6
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io.{File, FileFilter, InputStream}
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.SparkConf
+import RollingFileAppender._
+
+/**
+ * Continuously appends data from input stream into the given file, and rolls
+ * over the file after the given interval. The rolled over files are named
+ * based on the given pattern.
+ *
+ * @param inputStream             Input stream to read data from
+ * @param activeFile              File to write data to
+ * @param rollingPolicy           Policy based on which files will be rolled over.
+ * @param conf                    SparkConf that is used to pass on extra configurations
+ * @param bufferSize              Optional buffer size. Used mainly for testing.
+ */
+private[spark] class RollingFileAppender(
+    inputStream: InputStream,
+    activeFile: File,
+    val rollingPolicy: RollingPolicy,
+    conf: SparkConf,
+    bufferSize: Int = DEFAULT_BUFFER_SIZE
+  ) extends FileAppender(inputStream, activeFile, bufferSize) {
+
+  private val maxRetainedFiles = conf.getInt(RETAINED_FILES_PROPERTY, -1)
+
+  /** Stop the appender */
+  override def stop() {
+    super.stop()
+  }
+
+  /** Append bytes to file after rolling over is necessary */
+  override protected def appendToFile(bytes: Array[Byte], len: Int) {
+    if (rollingPolicy.shouldRollover(len)) {
+      rollover()
+      rollingPolicy.rolledOver()
+    }
+    super.appendToFile(bytes, len)
+    rollingPolicy.bytesWritten(len)
+  }
+
+  /** Rollover the file, by closing the output stream and moving it over */
+  private def rollover() {
+    try {
+      closeFile()
+      moveFile()
+      openFile()
+      if (maxRetainedFiles > 0) {
+        deleteOldFiles()
+      }
+    } catch {
+      case e: Exception =>
+        logError(s"Error rolling over $activeFile", e)
+    }
+  }
+
+  /** Move the active log file to a new rollover file */
+  private def moveFile() {
+    val rolloverSuffix = rollingPolicy.generateRolledOverFileSuffix()
+    val rolloverFile = new File(
+      activeFile.getParentFile, activeFile.getName + rolloverSuffix).getAbsoluteFile
+    try {
+      logDebug(s"Attempting to rollover file $activeFile to file $rolloverFile")
+      if (activeFile.exists) {
+        if (!rolloverFile.exists) {
+          FileUtils.moveFile(activeFile, rolloverFile)
+          logInfo(s"Rolled over $activeFile to $rolloverFile")
+        } else {
+          // In case the rollover file name clashes, make a unique file name.
+          // The resultant file names are long and ugly, so this is used only
+          // if there is a name collision. This can be avoided by the using
+          // the right pattern such that name collisions do not occur.
+          var i = 0
+          var altRolloverFile: File = null
+          do {
+            altRolloverFile = new File(activeFile.getParent,
+              s"${activeFile.getName}$rolloverSuffix--$i").getAbsoluteFile
+            i += 1
+          } while (i < 10000 && altRolloverFile.exists)
+
+          logWarning(s"Rollover file $rolloverFile already exists, " +
+            s"rolled over $activeFile to file $altRolloverFile")
+          FileUtils.moveFile(activeFile, altRolloverFile)
+        }
+      } else {
+        logWarning(s"File $activeFile does not exist")
+      }
+    }
+  }
+
+  /** Retain only last few files */
+  private[util] def deleteOldFiles() {
+    try {
+      val rolledoverFiles = activeFile.getParentFile.listFiles(new FileFilter {
+        def accept(f: File): Boolean = {
+          f.getName.startsWith(activeFile.getName) && f != activeFile
+        }
+      }).sorted
+      val filesToBeDeleted = rolledoverFiles.take(
+        math.max(0, rolledoverFiles.size - maxRetainedFiles))
+      filesToBeDeleted.foreach { file =>
+        logInfo(s"Deleting file executor log file ${file.getAbsolutePath}")
+        file.delete()
+      }
+    } catch {
+      case e: Exception =>
+        logError("Error cleaning logs in directory " + activeFile.getParentFile.getAbsolutePath, e)
+    }
+  }
+}
+
+/**
+ * Companion object to [[org.apache.spark.util.logging.RollingFileAppender]]. Defines
+ * names of configurations that configure rolling file appenders.
+ */
+private[spark] object RollingFileAppender {
+  val STRATEGY_PROPERTY = "spark.executor.logs.rolling.strategy"
+  val STRATEGY_DEFAULT = ""
+  val INTERVAL_PROPERTY = "spark.executor.logs.rolling.time.interval"
+  val INTERVAL_DEFAULT = "daily"
+  val SIZE_PROPERTY = "spark.executor.logs.rolling.size.maxBytes"
+  val SIZE_DEFAULT = (1024 * 1024).toString
+  val RETAINED_FILES_PROPERTY = "spark.executor.logs.rolling.maxRetainedFiles"
+  val DEFAULT_BUFFER_SIZE = 8192
+
+  /**
+   * Get the sorted list of rolled over files. This assumes that the all the rolled
+   * over file names are prefixed with the `activeFileName`, and the active file
+   * name has the latest logs. So it sorts all the rolled over logs (that are
+   * prefixed with `activeFileName`) and appends the active file
+   */
+  def getSortedRolledOverFiles(directory: String, activeFileName: String): Seq[File] = {
+    val rolledOverFiles = new File(directory).getAbsoluteFile.listFiles.filter { file =>
+      val fileName = file.getName
+      fileName.startsWith(activeFileName) && fileName != activeFileName
+    }.sorted
+    val activeFile = {
+      val file = new File(directory, activeFileName).getAbsoluteFile
+      if (file.exists) Some(file) else None
+    }
+    rolledOverFiles ++ activeFile
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingPolicy.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingPolicy.scala
new file mode 100644
index 0000000000000000000000000000000000000000..84e5c3c917dcb1edb6e7245dbf4c626d6c446329
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/logging/RollingPolicy.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.text.SimpleDateFormat
+import java.util.Calendar
+
+import org.apache.spark.Logging
+
+/**
+ * Defines the policy based on which [[org.apache.spark.util.logging.RollingFileAppender]] will
+ * generate rolling files.
+ */
+private[spark] trait RollingPolicy {
+
+  /** Whether rollover should be initiated at this moment */
+  def shouldRollover(bytesToBeWritten: Long): Boolean
+
+  /** Notify that rollover has occurred */
+  def rolledOver()
+
+  /** Notify that bytes have been written */
+  def bytesWritten(bytes: Long)
+
+  /** Get the desired name of the rollover file */
+  def generateRolledOverFileSuffix(): String
+}
+
+/**
+ * Defines a [[org.apache.spark.util.logging.RollingPolicy]] by which files will be rolled
+ * over at a fixed interval.
+ */
+private[spark] class TimeBasedRollingPolicy(
+    var rolloverIntervalMillis: Long,
+    rollingFileSuffixPattern: String,
+    checkIntervalConstraint: Boolean = true   // set to false while testing
+  ) extends RollingPolicy with Logging {
+
+  import TimeBasedRollingPolicy._
+  if (checkIntervalConstraint && rolloverIntervalMillis < MINIMUM_INTERVAL_SECONDS * 1000L) {
+    logWarning(s"Rolling interval [${rolloverIntervalMillis/1000L} seconds] is too small. " +
+      s"Setting the interval to the acceptable minimum of $MINIMUM_INTERVAL_SECONDS seconds.")
+    rolloverIntervalMillis = MINIMUM_INTERVAL_SECONDS * 1000L
+  }
+
+  @volatile private var nextRolloverTime = calculateNextRolloverTime()
+  private val formatter = new SimpleDateFormat(rollingFileSuffixPattern)
+
+  /** Should rollover if current time has exceeded next rollover time */
+  def shouldRollover(bytesToBeWritten: Long): Boolean = {
+    System.currentTimeMillis > nextRolloverTime
+  }
+
+  /** Rollover has occurred, so find the next time to rollover */
+  def rolledOver() {
+    nextRolloverTime = calculateNextRolloverTime()
+    logDebug(s"Current time: ${System.currentTimeMillis}, next rollover time: " + nextRolloverTime)
+  }
+
+  def bytesWritten(bytes: Long) { }  // nothing to do
+
+  private def calculateNextRolloverTime(): Long = {
+    val now = System.currentTimeMillis()
+    val targetTime = (
+      math.ceil(now.toDouble / rolloverIntervalMillis) * rolloverIntervalMillis
+    ).toLong
+    logDebug(s"Next rollover time is $targetTime")
+    targetTime
+  }
+
+  def generateRolledOverFileSuffix(): String = {
+    formatter.format(Calendar.getInstance.getTime)
+  }
+}
+
+private[spark] object TimeBasedRollingPolicy {
+  val MINIMUM_INTERVAL_SECONDS = 60L  // 1 minute
+}
+
+/**
+ * Defines a [[org.apache.spark.util.logging.RollingPolicy]] by which files will be rolled
+ * over after reaching a particular size.
+ */
+private[spark] class SizeBasedRollingPolicy(
+    var rolloverSizeBytes: Long,
+    checkSizeConstraint: Boolean = true     // set to false while testing
+  ) extends RollingPolicy with Logging {
+
+  import SizeBasedRollingPolicy._
+  if (checkSizeConstraint && rolloverSizeBytes < MINIMUM_SIZE_BYTES) {
+    logWarning(s"Rolling size [$rolloverSizeBytes bytes] is too small. " +
+      s"Setting the size to the acceptable minimum of $MINIMUM_SIZE_BYTES bytes.")
+    rolloverSizeBytes = MINIMUM_SIZE_BYTES
+  }
+
+  @volatile private var bytesWrittenSinceRollover = 0L
+  val formatter = new SimpleDateFormat("--YYYY-MM-dd--HH-mm-ss--SSSS")
+
+  /** Should rollover if the next set of bytes is going to exceed the size limit */
+  def shouldRollover(bytesToBeWritten: Long): Boolean = {
+    logInfo(s"$bytesToBeWritten + $bytesWrittenSinceRollover > $rolloverSizeBytes")
+    bytesToBeWritten + bytesWrittenSinceRollover > rolloverSizeBytes
+  }
+
+  /** Rollover has occurred, so reset the counter */
+  def rolledOver() {
+    bytesWrittenSinceRollover = 0
+  }
+
+  /** Increment the bytes that have been written in the current file */
+  def bytesWritten(bytes: Long) {
+    bytesWrittenSinceRollover += bytes
+  }
+
+  /** Get the desired name of the rollover file */
+  def generateRolledOverFileSuffix(): String = {
+    formatter.format(Calendar.getInstance.getTime)
+  }
+}
+
+private[spark] object SizeBasedRollingPolicy {
+  val MINIMUM_SIZE_BYTES = RollingFileAppender.DEFAULT_BUFFER_SIZE * 10
+}
+
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index bfae32dae0dc54ec2ed3f117e588ded0a5868506..01ab2d549325cf170ce7cfb0c4ace530bf6a6bf0 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -28,6 +28,7 @@ import org.scalatest.FunSuite
 import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
 import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, RecoveryState, WorkerInfo}
 import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
+import org.apache.spark.SparkConf
 
 class JsonProtocolSuite extends FunSuite {
 
@@ -116,7 +117,8 @@ class JsonProtocolSuite extends FunSuite {
   }
   def createExecutorRunner(): ExecutorRunner = {
     new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
-      new File("sparkHome"), new File("workDir"), "akka://worker", ExecutorState.RUNNING)
+      new File("sparkHome"), new File("workDir"), "akka://worker",
+      new SparkConf, ExecutorState.RUNNING)
   }
   def createDriverRunner(): DriverRunner = {
     new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), createDriverDesc(),
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 8ae387fa0be6febaaa8a1266d289a40e64fb3d71..e5f748d55500d0de3c3374d8b5774140a44453e3 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -22,6 +22,7 @@ import java.io.File
 import org.scalatest.FunSuite
 
 import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
+import org.apache.spark.SparkConf
 
 class ExecutorRunnerTest extends FunSuite {
   test("command includes appId") {
@@ -32,7 +33,7 @@ class ExecutorRunnerTest extends FunSuite {
       sparkHome, "appUiUrl")
     val appId = "12345-worker321-9876"
     val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")),
-      f("ooga"), "blah", ExecutorState.RUNNING)
+      f("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
 
     assert(er.getCommandSeq.last === appId)
   }
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..53d7f5c6072e64c36d8330e87f0e3156d63e2c6c
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io._
+
+import scala.collection.mutable.HashSet
+import scala.reflect._
+
+import org.apache.commons.io.{FileUtils, IOUtils}
+import org.apache.spark.{Logging, SparkConf}
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.apache.spark.util.logging.{RollingFileAppender, SizeBasedRollingPolicy, TimeBasedRollingPolicy, FileAppender}
+
+class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
+
+  val testFile = new File("FileAppenderSuite-test-" + System.currentTimeMillis).getAbsoluteFile
+
+  before {
+    cleanup()
+  }
+
+  after {
+    cleanup()
+  }
+
+  test("basic file appender") {
+    val testString = (1 to 1000).mkString(", ")
+    val inputStream = IOUtils.toInputStream(testString)
+    val appender = new FileAppender(inputStream, testFile)
+    inputStream.close()
+    appender.awaitTermination()
+    assert(FileUtils.readFileToString(testFile) === testString)
+  }
+
+  test("rolling file appender - time-based rolling") {
+    // setup input stream and appender
+    val testOutputStream = new PipedOutputStream()
+    val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
+    val rolloverIntervalMillis = 100
+    val durationMillis = 1000
+    val numRollovers = durationMillis / rolloverIntervalMillis
+    val textToAppend = (1 to numRollovers).map( _.toString * 10 )
+
+    val appender = new RollingFileAppender(testInputStream, testFile,
+      new TimeBasedRollingPolicy(rolloverIntervalMillis, s"--HH-mm-ss-SSSS", false),
+      new SparkConf(), 10)
+
+    testRolling(appender, testOutputStream, textToAppend, rolloverIntervalMillis)
+  }
+
+  test("rolling file appender - size-based rolling") {
+    // setup input stream and appender
+    val testOutputStream = new PipedOutputStream()
+    val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
+    val rolloverSize = 1000
+    val textToAppend = (1 to 3).map( _.toString * 1000 )
+
+    val appender = new RollingFileAppender(testInputStream, testFile,
+      new SizeBasedRollingPolicy(rolloverSize, false), new SparkConf(), 99)
+
+    val files = testRolling(appender, testOutputStream, textToAppend, 0)
+    files.foreach { file =>
+      logInfo(file.toString + ": " + file.length + " bytes")
+      assert(file.length <= rolloverSize)
+    }
+  }
+
+  test("rolling file appender - cleaning") {
+    // setup input stream and appender
+    val testOutputStream = new PipedOutputStream()
+    val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
+    val conf = new SparkConf().set(RollingFileAppender.RETAINED_FILES_PROPERTY, "10")
+    val appender = new RollingFileAppender(testInputStream, testFile,
+      new SizeBasedRollingPolicy(1000, false), conf, 10)
+
+    // send data to appender through the input stream, and wait for the data to be written
+    val allGeneratedFiles = new HashSet[String]()
+    val items = (1 to 10).map { _.toString * 10000 }
+    for (i <- 0 until items.size) {
+      testOutputStream.write(items(i).getBytes("UTF8"))
+      testOutputStream.flush()
+      allGeneratedFiles ++= RollingFileAppender.getSortedRolledOverFiles(
+        testFile.getParentFile.toString, testFile.getName).map(_.toString)
+
+      Thread.sleep(10)
+    }
+    testOutputStream.close()
+    appender.awaitTermination()
+    logInfo("Appender closed")
+
+    // verify whether the earliest file has been deleted
+    val rolledOverFiles = allGeneratedFiles.filter { _ != testFile.toString }.toArray.sorted
+    logInfo(s"All rolled over files generated:${rolledOverFiles.size}\n" + rolledOverFiles.mkString("\n"))
+    assert(rolledOverFiles.size > 2)
+    val earliestRolledOverFile = rolledOverFiles.head
+    val existingRolledOverFiles = RollingFileAppender.getSortedRolledOverFiles(
+      testFile.getParentFile.toString, testFile.getName).map(_.toString)
+    logInfo("Existing rolled over files:\n" + existingRolledOverFiles.mkString("\n"))
+    assert(!existingRolledOverFiles.toSet.contains(earliestRolledOverFile))
+  }
+
+  test("file appender selection") {
+    // Test whether FileAppender.apply() returns the right type of the FileAppender based
+    // on SparkConf settings.
+
+    def testAppenderSelection[ExpectedAppender: ClassTag, ExpectedRollingPolicy](
+        properties: Seq[(String, String)], expectedRollingPolicyParam: Long = -1): FileAppender = {
+
+      // Set spark conf properties
+      val conf = new SparkConf
+      properties.foreach { p =>
+        conf.set(p._1, p._2)
+      }
+
+      // Create and test file appender
+      val inputStream = new PipedInputStream(new PipedOutputStream())
+      val appender = FileAppender(inputStream, new File("stdout"), conf)
+      assert(appender.isInstanceOf[ExpectedAppender])
+      assert(appender.getClass.getSimpleName ===
+        classTag[ExpectedAppender].runtimeClass.getSimpleName)
+      if (appender.isInstanceOf[RollingFileAppender]) {
+        val rollingPolicy = appender.asInstanceOf[RollingFileAppender].rollingPolicy
+        rollingPolicy.isInstanceOf[ExpectedRollingPolicy]
+        val policyParam = if (rollingPolicy.isInstanceOf[TimeBasedRollingPolicy]) {
+          rollingPolicy.asInstanceOf[TimeBasedRollingPolicy].rolloverIntervalMillis
+        } else {
+          rollingPolicy.asInstanceOf[SizeBasedRollingPolicy].rolloverSizeBytes
+        }
+        assert(policyParam === expectedRollingPolicyParam)
+      }
+      appender
+    }
+
+    import RollingFileAppender._
+
+    def rollingStrategy(strategy: String) = Seq(STRATEGY_PROPERTY -> strategy)
+    def rollingSize(size: String) = Seq(SIZE_PROPERTY -> size)
+    def rollingInterval(interval: String) = Seq(INTERVAL_PROPERTY -> interval)
+
+    val msInDay = 24 * 60 * 60 * 1000L
+    val msInHour = 60 * 60 * 1000L
+    val msInMinute = 60 * 1000L
+
+    // test no strategy -> no rolling
+    testAppenderSelection[FileAppender, Any](Seq.empty)
+
+    // test time based rolling strategy
+    testAppenderSelection[RollingFileAppender, Any](rollingStrategy("time"), msInDay)
+    testAppenderSelection[RollingFileAppender, TimeBasedRollingPolicy](
+      rollingStrategy("time") ++ rollingInterval("daily"), msInDay)
+    testAppenderSelection[RollingFileAppender, TimeBasedRollingPolicy](
+      rollingStrategy("time") ++ rollingInterval("hourly"), msInHour)
+    testAppenderSelection[RollingFileAppender, TimeBasedRollingPolicy](
+      rollingStrategy("time") ++ rollingInterval("minutely"), msInMinute)
+    testAppenderSelection[RollingFileAppender, TimeBasedRollingPolicy](
+      rollingStrategy("time") ++ rollingInterval("123456789"), 123456789 * 1000L)
+    testAppenderSelection[FileAppender, Any](
+      rollingStrategy("time") ++ rollingInterval("xyz"))
+
+    // test size based rolling strategy
+    testAppenderSelection[RollingFileAppender, SizeBasedRollingPolicy](
+      rollingStrategy("size") ++ rollingSize("123456789"), 123456789)
+    testAppenderSelection[FileAppender, Any](rollingSize("xyz"))
+
+    // test illegal strategy
+    testAppenderSelection[FileAppender, Any](rollingStrategy("xyz"))
+  }
+
+  /**
+   * Run the rolling file appender with data and see whether all the data was written correctly
+   * across rolled over files.
+   */
+  def testRolling(
+      appender: FileAppender,
+      outputStream: OutputStream,
+      textToAppend: Seq[String],
+      sleepTimeBetweenTexts: Long
+    ): Seq[File] = {
+    // send data to appender through the input stream, and wait for the data to be written
+    val expectedText = textToAppend.mkString("")
+    for (i <- 0 until textToAppend.size) {
+      outputStream.write(textToAppend(i).getBytes("UTF8"))
+      outputStream.flush()
+      Thread.sleep(sleepTimeBetweenTexts)
+    }
+    logInfo("Data sent to appender")
+    outputStream.close()
+    appender.awaitTermination()
+    logInfo("Appender closed")
+
+    // verify whether all the data written to rolled over files is same as expected
+    val generatedFiles = RollingFileAppender.getSortedRolledOverFiles(
+      testFile.getParentFile.toString, testFile.getName)
+    logInfo("Filtered files: \n" + generatedFiles.mkString("\n"))
+    assert(generatedFiles.size > 1)
+    val allText = generatedFiles.map { file =>
+      FileUtils.readFileToString(file)
+    }.mkString("")
+    assert(allText === expectedText)
+    generatedFiles
+  }
+
+  /** Delete all the generated rolledover files */
+  def cleanup() {
+    testFile.getParentFile.listFiles.filter { file =>
+      file.getName.startsWith(testFile.getName)
+    }.foreach { _.delete() }
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 0aad882ed76a8df09487ecc54946c7a1d64ae0f3..1ee936bc78f4911a4a62efffeced24101e97ba35 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -140,6 +140,38 @@ class UtilsSuite extends FunSuite {
     Utils.deleteRecursively(tmpDir2)
   }
 
+  test("reading offset bytes across multiple files") {
+    val tmpDir = Files.createTempDir()
+    tmpDir.deleteOnExit()
+    val files = (1 to 3).map(i => new File(tmpDir, i.toString))
+    Files.write("0123456789", files(0), Charsets.UTF_8)
+    Files.write("abcdefghij", files(1), Charsets.UTF_8)
+    Files.write("ABCDEFGHIJ", files(2), Charsets.UTF_8)
+
+    // Read first few bytes in the 1st file
+    assert(Utils.offsetBytes(files, 0, 5) === "01234")
+
+    // Read bytes within the 1st file
+    assert(Utils.offsetBytes(files, 5, 8) === "567")
+
+    // Read bytes across 1st and 2nd file
+    assert(Utils.offsetBytes(files, 8, 18) === "89abcdefgh")
+
+    // Read bytes across 1st, 2nd and 3rd file
+    assert(Utils.offsetBytes(files, 5, 24) === "56789abcdefghijABCD")
+
+    // Read some nonexistent bytes in the beginning
+    assert(Utils.offsetBytes(files, -5, 18) === "0123456789abcdefgh")
+
+    // Read some nonexistent bytes at the end
+    assert(Utils.offsetBytes(files, 18, 35) === "ijABCDEFGHIJ")
+
+    // Read some nonexistent bytes on both ends
+    assert(Utils.offsetBytes(files, -5, 35) === "0123456789abcdefghijABCDEFGHIJ")
+
+    Utils.deleteRecursively(tmpDir)
+  }
+
   test("deserialize long value") {
     val testval : Long = 9730889947L
     val bbuf = ByteBuffer.allocate(8)
diff --git a/docs/configuration.md b/docs/configuration.md
index 71fafa573467f035cde8b82ae91237a41e2a7716..b84104cc7e6535b0ee5101984a59a5251939a475 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -784,6 +784,45 @@ Apart from these, the following properties are also available, and may be useful
     higher memory usage in Spark.
   </td>
 </tr>
+<tr>
+  <td><code>spark.executor.logs.rolling.strategy</code></td>
+  <td>(none)</td>
+  <td>
+    Set the strategy of rolling of executor logs. By default it is disabled. It can
+    be set to "time" (time-based rolling) or "size" (size-based rolling). For "time",
+    use <code>spark.executor.logs.rolling.time.interval</code> to set the rolling interval.
+    For "size", use <code>spark.executor.logs.rolling.size.maxBytes</code> to set
+    the maximum file size for rolling.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.executor.logs.rolling.time.interval</code></td>
+  <td>daily</td>
+  <td>
+    Set the time interval by which the executor logs will be rolled over.
+    Rolling is disabled by default. Valid values are `daily`, `hourly`, `minutely` or
+    any interval in seconds. See <code>spark.executor.logs.rolling.maxRetainedFiles</code>
+    for automatic cleaning of old logs.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.executor.logs.rolling.size.maxBytes</code></td>
+  <td>(none)</td>
+  <td>
+    Set the max size of the file by which the executor logs will be rolled over.
+    Rolling is disabled by default. Value is set in terms of bytes.
+    See <code>spark.executor.logs.rolling.maxRetainedFiles</code>
+    for automatic cleaning of old logs.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.executor.logs.rolling.maxRetainedFiles</code></td>
+  <td>(none)</td>
+  <td>
+    Sets the number of latest rolling log files that are going to be retained by the system.
+    Older log files will be deleted. Disabled by default.
+  </td>
+</tr>
 </table>
 
 #### Cluster Managers