diff --git a/bin/spark-submit b/bin/spark-submit index c557311b4b20ec1f45be96b9251317fd9e077b8a..f92d90c3a66b08941c3da93cc9ea54bc3e9a02d0 100755 --- a/bin/spark-submit +++ b/bin/spark-submit @@ -22,6 +22,9 @@ export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" ORIG_ARGS=("$@") +# Set COLUMNS for progress bar +export COLUMNS=`tput cols` + while (($#)); do if [ "$1" = "--deploy-mode" ]; then SPARK_SUBMIT_DEPLOY_MODE=$2 diff --git a/core/src/main/java/org/apache/spark/SparkStageInfo.java b/core/src/main/java/org/apache/spark/SparkStageInfo.java index 04e2247210ecc8dd229ed80548daf29cadc65da3..fd74321093658666e32868bf33c33c39c4871322 100644 --- a/core/src/main/java/org/apache/spark/SparkStageInfo.java +++ b/core/src/main/java/org/apache/spark/SparkStageInfo.java @@ -26,6 +26,7 @@ package org.apache.spark; public interface SparkStageInfo { int stageId(); int currentAttemptId(); + long submissionTime(); String name(); int numTasks(); int numActiveTasks(); diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7cccf74003431f8f9dc2522af6c27151336ff655..37013121c572a59e1ca35810898b541c2b1cbdbb 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -50,7 +50,7 @@ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkD import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import org.apache.spark.scheduler.local.LocalBackend import org.apache.spark.storage._ -import org.apache.spark.ui.SparkUI +import org.apache.spark.ui.{SparkUI, ConsoleProgressBar} import org.apache.spark.ui.jobs.JobProgressListener import org.apache.spark.util._ @@ -245,6 +245,13 @@ class SparkContext(config: SparkConf) extends Logging { val statusTracker = new SparkStatusTracker(this) + private[spark] val progressBar: Option[ConsoleProgressBar] = + if (conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) { + Some(new ConsoleProgressBar(this)) + } else { + None + } + // Initialize the Spark UI private[spark] val ui: Option[SparkUI] = if (conf.getBoolean("spark.ui.enabled", true)) { @@ -1274,6 +1281,7 @@ class SparkContext(config: SparkConf) extends Logging { logInfo("Starting job: " + callSite.shortForm) dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, resultHandler, localProperties.get) + progressBar.foreach(_.finishAll()) rdd.doCheckpoint() } diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala index c18d763d7ff4d3e4b37ff5f2808f2a6a8a3e93c3..edbdda8a0bcb63fb588966efae692f8129e56937 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala @@ -96,6 +96,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) { new SparkStageInfoImpl( stageId, info.attemptId, + info.submissionTime.getOrElse(0), info.name, info.numTasks, data.numActiveTasks, diff --git a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala index 90b47c847fbca3c242cc5b029756555d620bcf23..e5c7c8d0db5784f292c56c15cbddbed70cca95a0 100644 --- a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala +++ b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala @@ -26,6 +26,7 @@ private class SparkJobInfoImpl ( private class SparkStageInfoImpl( val stageId: Int, val currentAttemptId: Int, + val submissionTime: Long, val name: String, val numTasks: Int, val numActiveTasks: Int, diff --git a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala new file mode 100644 index 0000000000000000000000000000000000000000..27ba9e18237b5e9104eae6982e611ab4e4573657 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui + +import java.util.{Timer, TimerTask} + +import org.apache.spark._ + +/** + * ConsoleProgressBar shows the progress of stages in the next line of the console. It poll the + * status of active stages from `sc.statusTracker` periodically, the progress bar will be showed + * up after the stage has ran at least 500ms. If multiple stages run in the same time, the status + * of them will be combined together, showed in one line. + */ +private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging { + + // Carrige return + val CR = '\r' + // Update period of progress bar, in milliseconds + val UPDATE_PERIOD = 200L + // Delay to show up a progress bar, in milliseconds + val FIRST_DELAY = 500L + + // The width of terminal + val TerminalWidth = if (!sys.env.getOrElse("COLUMNS", "").isEmpty) { + sys.env.get("COLUMNS").get.toInt + } else { + 80 + } + + var lastFinishTime = 0L + var lastUpdateTime = 0L + var lastProgressBar = "" + + // Schedule a refresh thread to run periodically + private val timer = new Timer("refresh progress", true) + timer.schedule(new TimerTask{ + override def run() { + refresh() + } + }, FIRST_DELAY, UPDATE_PERIOD) + + /** + * Try to refresh the progress bar in every cycle + */ + private def refresh(): Unit = synchronized { + val now = System.currentTimeMillis() + if (now - lastFinishTime < FIRST_DELAY) { + return + } + val stageIds = sc.statusTracker.getActiveStageIds() + val stages = stageIds.map(sc.statusTracker.getStageInfo).flatten.filter(_.numTasks() > 1) + .filter(now - _.submissionTime() > FIRST_DELAY).sortBy(_.stageId()) + if (stages.size > 0) { + show(now, stages.take(3)) // display at most 3 stages in same time + } + } + + /** + * Show progress bar in console. The progress bar is displayed in the next line + * after your last output, keeps overwriting itself to hold in one line. The logging will follow + * the progress bar, then progress bar will be showed in next line without overwrite logs. + */ + private def show(now: Long, stages: Seq[SparkStageInfo]) { + val width = TerminalWidth / stages.size + val bar = stages.map { s => + val total = s.numTasks() + val header = s"[Stage ${s.stageId()}:" + val tailer = s"(${s.numCompletedTasks()} + ${s.numActiveTasks()}) / $total]" + val w = width - header.size - tailer.size + val bar = if (w > 0) { + val percent = w * s.numCompletedTasks() / total + (0 until w).map { i => + if (i < percent) "=" else if (i == percent) ">" else " " + }.mkString("") + } else { + "" + } + header + bar + tailer + }.mkString("") + + // only refresh if it's changed of after 1 minute (or the ssh connection will be closed + // after idle some time) + if (bar != lastProgressBar || now - lastUpdateTime > 60 * 1000L) { + System.err.print(CR + bar) + lastUpdateTime = now + } + lastProgressBar = bar + } + + /** + * Clear the progress bar if showed. + */ + private def clear() { + if (!lastProgressBar.isEmpty) { + System.err.printf(CR + " " * TerminalWidth + CR) + lastProgressBar = "" + } + } + + /** + * Mark all the stages as finished, clear the progress bar if showed, then the progress will not + * interweave with output of jobs. + */ + def finishAll(): Unit = synchronized { + clear() + lastFinishTime = System.currentTimeMillis() + } +} diff --git a/pom.xml b/pom.xml index cc7bce175778fd56073f8bf659b6699720ee8e38..94c4422743df3d9ad0dcfeda4d0969aaebdb1737 100644 --- a/pom.xml +++ b/pom.xml @@ -977,6 +977,7 @@ <spark.test.home>${session.executionRootDirectory}</spark.test.home> <spark.testing>1</spark.testing> <spark.ui.enabled>false</spark.ui.enabled> + <spark.ui.showConsoleProgress>false</spark.ui.showConsoleProgress> <spark.executor.extraClassPath>${test_classpath}</spark.executor.extraClassPath> <spark.driver.allowMultipleContexts>true</spark.driver.allowMultipleContexts> </systemProperties> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 1697b6d4f2d4399a3eca42c2a4a49a0acd9095e9..c1879ce4ba0ebcf85798ce92f6b7ce500258e9fc 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -377,6 +377,7 @@ object TestSettings { javaOptions in Test += "-Dspark.testing=1", javaOptions in Test += "-Dspark.port.maxRetries=100", javaOptions in Test += "-Dspark.ui.enabled=false", + javaOptions in Test += "-Dspark.ui.showConsoleProgress=false", javaOptions in Test += "-Dspark.driver.allowMultipleContexts=true", javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true", javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark")