Skip to content
Snippets Groups Projects
Commit 8942b522 authored by xukun 00228947's avatar xukun 00228947 Committed by Andrew Or
Browse files

[SPARK-3562]Periodic cleanup event logs

Author: xukun 00228947 <xukun.xu@huawei.com>

Closes #4214 from viper-kun/cleaneventlog and squashes the following commits:

7a5b9c5 [xukun 00228947] fix issue
31674ee [xukun 00228947] fix issue
6e3d06b [xukun 00228947] fix issue
373f3b9 [xukun 00228947] fix issue
71782b5 [xukun 00228947] fix issue
5b45035 [xukun 00228947] fix issue
70c28d6 [xukun 00228947] fix issues
adcfe86 [xukun 00228947] Periodic cleanup event logs
parent 10094a52
No related branches found
No related tags found
No related merge requests found
...@@ -362,7 +362,13 @@ private[spark] object SparkConf extends Logging { ...@@ -362,7 +362,13 @@ private[spark] object SparkConf extends Logging {
DeprecatedConfig("spark.files.userClassPathFirst", "spark.executor.userClassPathFirst", DeprecatedConfig("spark.files.userClassPathFirst", "spark.executor.userClassPathFirst",
"1.3"), "1.3"),
DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3", DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3",
"Use spark.{driver,executor}.userClassPathFirst instead.")) "Use spark.{driver,executor}.userClassPathFirst instead."),
DeprecatedConfig("spark.history.fs.updateInterval",
"spark.history.fs.update.interval.seconds",
"1.3", "Use spark.history.fs.update.interval.seconds instead"),
DeprecatedConfig("spark.history.updateInterval",
"spark.history.fs.update.interval.seconds",
"1.3", "Use spark.history.fs.update.interval.seconds instead"))
configs.map { x => (x.oldName, x) }.toMap configs.map { x => (x.oldName, x) }.toMap
} }
......
...@@ -17,9 +17,13 @@ ...@@ -17,9 +17,13 @@
package org.apache.spark.deploy.history package org.apache.spark.deploy.history
import java.io.{BufferedInputStream, FileNotFoundException, InputStream} import java.io.{IOException, BufferedInputStream, FileNotFoundException, InputStream}
import java.util.concurrent.{Executors, TimeUnit}
import scala.collection.mutable import scala.collection.mutable
import scala.concurrent.duration.Duration
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.permission.AccessControlException import org.apache.hadoop.fs.permission.AccessControlException
...@@ -44,8 +48,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis ...@@ -44,8 +48,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private val NOT_STARTED = "<Not Started>" private val NOT_STARTED = "<Not Started>"
// Interval between each check for event log updates // Interval between each check for event log updates
private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval", private val UPDATE_INTERVAL_MS = conf.getOption("spark.history.fs.update.interval.seconds")
conf.getInt("spark.history.updateInterval", 10)) * 1000 .orElse(conf.getOption(SparkConf.translateConfKey("spark.history.fs.updateInterval", true)))
.orElse(conf.getOption(SparkConf.translateConfKey("spark.history.updateInterval", true)))
.map(_.toInt)
.getOrElse(10) * 1000
// Interval between each cleaner checks for event logs to delete
private val CLEAN_INTERVAL_MS = conf.getLong("spark.history.fs.cleaner.interval.seconds",
DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S) * 1000
private val logDir = conf.getOption("spark.history.fs.logDirectory") private val logDir = conf.getOption("spark.history.fs.logDirectory")
.map { d => Utils.resolveURI(d).toString } .map { d => Utils.resolveURI(d).toString }
...@@ -53,8 +64,11 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis ...@@ -53,8 +64,11 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf)) private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf))
// A timestamp of when the disk was last accessed to check for log updates // Used by check event thread and clean log thread.
private var lastLogCheckTimeMs = -1L // Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs
// and applications between check task and clean task.
private val pool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat("spark-history-task-%d").setDaemon(true).build())
// The modification time of the newest log detected during the last scan. This is used // The modification time of the newest log detected during the last scan. This is used
// to ignore logs that are older during subsequent scans, to avoid processing data that // to ignore logs that are older during subsequent scans, to avoid processing data that
...@@ -73,25 +87,13 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis ...@@ -73,25 +87,13 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE" private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
/** /**
* A background thread that periodically checks for event log updates on disk. * Return a runnable that performs the given operation on the event logs.
* * This operation is expected to be executed periodically.
* If a log check is invoked manually in the middle of a period, this thread re-adjusts the
* time at which it performs the next log check to maintain the same period as before.
*
* TODO: Add a mechanism to update manually.
*/ */
private val logCheckingThread = new Thread("LogCheckingThread") { private def getRunner(operateFun: () => Unit): Runnable = {
override def run() = Utils.logUncaughtExceptions { new Runnable() {
while (true) { override def run() = Utils.logUncaughtExceptions {
val now = getMonotonicTimeMs() operateFun()
if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) {
Thread.sleep(UPDATE_INTERVAL_MS)
} else {
// If the user has manually checked for logs recently, wait until
// UPDATE_INTERVAL_MS after the last check time
Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now)
}
checkForLogs()
} }
} }
} }
...@@ -113,12 +115,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis ...@@ -113,12 +115,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
"Logging directory specified is not a directory: %s".format(logDir)) "Logging directory specified is not a directory: %s".format(logDir))
} }
checkForLogs()
// Disable the background thread during tests. // Disable the background thread during tests.
if (!conf.contains("spark.testing")) { if (!conf.contains("spark.testing")) {
logCheckingThread.setDaemon(true) // A task that periodically checks for event log updates on disk.
logCheckingThread.start() pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_MS,
TimeUnit.MILLISECONDS)
if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
// A task that periodically cleans event logs on disk.
pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_MS,
TimeUnit.MILLISECONDS)
}
} }
} }
...@@ -163,9 +170,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis ...@@ -163,9 +170,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
* applications that haven't been updated since last time the logs were checked. * applications that haven't been updated since last time the logs were checked.
*/ */
private[history] def checkForLogs(): Unit = { private[history] def checkForLogs(): Unit = {
lastLogCheckTimeMs = getMonotonicTimeMs()
logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
try { try {
var newLastModifiedTime = lastModifiedTime var newLastModifiedTime = lastModifiedTime
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
...@@ -230,6 +234,45 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis ...@@ -230,6 +234,45 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
} }
} }
/**
* Delete event logs from the log directory according to the clean policy defined by the user.
*/
private def cleanLogs(): Unit = {
try {
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
.getOrElse(Seq[FileStatus]())
val maxAge = conf.getLong("spark.history.fs.cleaner.maxAge.seconds",
DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000
val now = System.currentTimeMillis()
val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
applications.values.foreach { info =>
if (now - info.lastUpdated <= maxAge) {
appsToRetain += (info.id -> info)
}
}
applications = appsToRetain
// Scan all logs from the log directory.
// Only directories older than the specified max age will be deleted
statusList.foreach { dir =>
try {
if (now - dir.getModificationTime() > maxAge) {
// if path is a directory and set to true,
// the directory is deleted else throws an exception
fs.delete(dir.getPath, true)
}
} catch {
case t: IOException => logError(s"IOException in cleaning logs of $dir", t)
}
}
} catch {
case t: Exception => logError("Exception in cleaning logs", t)
}
}
/** /**
* Comparison function that defines the sort order for the application listing. * Comparison function that defines the sort order for the application listing.
* *
...@@ -336,9 +379,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis ...@@ -336,9 +379,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
} }
} }
/** Returns the system's mononotically increasing time. */
private def getMonotonicTimeMs(): Long = System.nanoTime() / (1000 * 1000)
/** /**
* Return true when the application has completed. * Return true when the application has completed.
*/ */
...@@ -354,6 +394,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis ...@@ -354,6 +394,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private object FsHistoryProvider { private object FsHistoryProvider {
val DEFAULT_LOG_DIR = "file:/tmp/spark-events" val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
// One day
val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds
// One week
val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds
} }
private class FsApplicationHistoryInfo( private class FsApplicationHistoryInfo(
......
...@@ -86,7 +86,7 @@ follows: ...@@ -86,7 +86,7 @@ follows:
</td> </td>
</tr> </tr>
<tr> <tr>
<td>spark.history.fs.updateInterval</td> <td>spark.history.fs.update.interval.seconds</td>
<td>10</td> <td>10</td>
<td> <td>
The period, in seconds, at which information displayed by this history server is updated. The period, in seconds, at which information displayed by this history server is updated.
...@@ -145,6 +145,29 @@ follows: ...@@ -145,6 +145,29 @@ follows:
If disabled, no access control checks are made. If disabled, no access control checks are made.
</td> </td>
</tr> </tr>
<tr>
<td>spark.history.fs.cleaner.enabled</td>
<td>false</td>
<td>
Specifies whether the History Server should periodically clean up event logs from storage.
</td>
</tr>
<tr>
<td>spark.history.fs.cleaner.interval.seconds</td>
<td>86400</td>
<td>
How often the job history cleaner checks for files to delete, in seconds. Defaults to 86400 (one day).
Files are only deleted if they are older than spark.history.fs.cleaner.maxAge.seconds.
</td>
</tr>
<tr>
<td>spark.history.fs.cleaner.maxAge.seconds</td>
<td>3600 * 24 * 7</td>
<td>
Job history files older than this many seconds will be deleted when the history cleaner runs.
Defaults to 3600 * 24 * 7 (1 week).
</td>
</tr>
</table> </table>
Note that in all of these UIs, the tables are sortable by clicking their headers, Note that in all of these UIs, the tables are sortable by clicking their headers,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment