Skip to content
Snippets Groups Projects
Commit e076fb05 authored by Tejas Patil's avatar Tejas Patil Committed by Sean Owen
Browse files

[SPARK-16919] Configurable update interval for console progress bar

## What changes were proposed in this pull request?

Currently the update interval for the console progress bar is hardcoded. This PR makes it configurable for users.

## How was this patch tested?

Ran a long running job and with a high value of update interval, the updates were shown less frequently.

Author: Tejas Patil <tejasp@fb.com>

Closes #14507 from tejasapatil/SPARK-16919.
parent a16983c9
No related branches found
No related tags found
No related merge requests found
......@@ -30,22 +30,23 @@ import org.apache.spark.internal.Logging
*/
private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
// Carriage return
val CR = '\r'
private val CR = '\r'
// Update period of progress bar, in milliseconds
val UPDATE_PERIOD = 200L
private val updatePeriodMSec =
sc.getConf.getTimeAsMs("spark.ui.consoleProgress.update.interval", "200")
// Delay to show up a progress bar, in milliseconds
val FIRST_DELAY = 500L
private val firstDelayMSec = 500L
// The width of terminal
val TerminalWidth = if (!sys.env.getOrElse("COLUMNS", "").isEmpty) {
private val TerminalWidth = if (!sys.env.getOrElse("COLUMNS", "").isEmpty) {
sys.env.get("COLUMNS").get.toInt
} else {
80
}
var lastFinishTime = 0L
var lastUpdateTime = 0L
var lastProgressBar = ""
private var lastFinishTime = 0L
private var lastUpdateTime = 0L
private var lastProgressBar = ""
// Schedule a refresh thread to run periodically
private val timer = new Timer("refresh progress", true)
......@@ -53,19 +54,19 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
override def run() {
refresh()
}
}, FIRST_DELAY, UPDATE_PERIOD)
}, firstDelayMSec, updatePeriodMSec)
/**
* Try to refresh the progress bar in every cycle
*/
private def refresh(): Unit = synchronized {
val now = System.currentTimeMillis()
if (now - lastFinishTime < FIRST_DELAY) {
if (now - lastFinishTime < firstDelayMSec) {
return
}
val stageIds = sc.statusTracker.getActiveStageIds()
val stages = stageIds.flatMap(sc.statusTracker.getStageInfo).filter(_.numTasks() > 1)
.filter(now - _.submissionTime() > FIRST_DELAY).sortBy(_.stageId())
.filter(now - _.submissionTime() > firstDelayMSec).sortBy(_.stageId())
if (stages.length > 0) {
show(now, stages.take(3)) // display at most 3 stages in same time
}
......@@ -94,7 +95,7 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
header + bar + tailer
}.mkString("")
// only refresh if it's changed of after 1 minute (or the ssh connection will be closed
// only refresh if it's changed OR after 1 minute (or the ssh connection will be closed
// after idle some time)
if (bar != lastProgressBar || now - lastUpdateTime > 60 * 1000L) {
System.err.print(CR + bar)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment