Skip to content
Snippets Groups Projects
Commit 19913373 authored by Marcelo Vanzin's avatar Marcelo Vanzin Committed by Andrew Or
Browse files

[SPARK-5933] [core] Move config deprecation warnings to SparkConf.

I didn't find many deprecated configs after a grep-based search,
but the ones I could find were moved to the centralized location
in SparkConf.

While there, I deprecated a couple more HS configs that mentioned
time units.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #5562 from vanzin/SPARK-5933 and squashes the following commits:

dcb617e7 [Marcelo Vanzin] [SPARK-5933] [core] Move config deprecation warnings to SparkConf.
parent 6fbeb82e
No related branches found
No related tags found
No related merge requests found
......@@ -403,6 +403,9 @@ private[spark] object SparkConf extends Logging {
*/
private val deprecatedConfigs: Map[String, DeprecatedConfig] = {
val configs = Seq(
DeprecatedConfig("spark.cache.class", "0.8",
"The spark.cache.class property is no longer being used! Specify storage levels using " +
"the RDD.persist() method instead."),
DeprecatedConfig("spark.yarn.user.classpath.first", "1.3",
"Please use spark.{driver,executor}.userClassPathFirst instead."))
Map(configs.map { cfg => (cfg.key -> cfg) }:_*)
......@@ -420,7 +423,15 @@ private[spark] object SparkConf extends Logging {
"spark.history.fs.update.interval" -> Seq(
AlternateConfig("spark.history.fs.update.interval.seconds", "1.4"),
AlternateConfig("spark.history.fs.updateInterval", "1.3"),
AlternateConfig("spark.history.updateInterval", "1.3"))
AlternateConfig("spark.history.updateInterval", "1.3")),
"spark.history.fs.cleaner.interval" -> Seq(
AlternateConfig("spark.history.fs.cleaner.interval.seconds", "1.4")),
"spark.history.fs.cleaner.maxAge" -> Seq(
AlternateConfig("spark.history.fs.cleaner.maxAge.seconds", "1.4")),
"spark.yarn.am.waitTime" -> Seq(
AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3",
// Translate old value to a duration, with 10s wait time per try.
translation = s => s"${s.toLong * 10}s"))
)
/**
......@@ -470,7 +481,7 @@ private[spark] object SparkConf extends Logging {
configsWithAlternatives.get(key).flatMap { alts =>
alts.collectFirst { case alt if conf.contains(alt.key) =>
val value = conf.get(alt.key)
alt.translation.map(_(value)).getOrElse(value)
if (alt.translation != null) alt.translation(value) else value
}
}
}
......@@ -514,6 +525,6 @@ private[spark] object SparkConf extends Logging {
private case class AlternateConfig(
key: String,
version: String,
translation: Option[String => String] = None)
translation: String => String = null)
}
......@@ -103,7 +103,7 @@ class SparkEnv (
// actorSystem.awaitTermination()
// Note that blockTransferService is stopped by BlockManager since it is started by it.
// If we only stop sc, but the driver process still run as a services then we need to delete
// the tmp dir, if not, it will create too many tmp dirs.
// We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
......@@ -375,12 +375,6 @@ object SparkEnv extends Logging {
"."
}
// Warn about deprecated spark.cache.class property
if (conf.contains("spark.cache.class")) {
logWarning("The spark.cache.class property is no longer being used! Specify storage " +
"levels using the RDD.persist() method instead.")
}
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
new OutputCommitCoordinator(conf)
}
......@@ -406,7 +400,7 @@ object SparkEnv extends Logging {
shuffleMemoryManager,
outputCommitCoordinator,
conf)
// Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
// called, and we only need to do it for driver. Because driver may run as a service, and if we
// don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
......
......@@ -52,8 +52,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private val UPDATE_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.update.interval", "10s")
// Interval between each cleaner checks for event logs to delete
private val CLEAN_INTERVAL_MS = conf.getLong("spark.history.fs.cleaner.interval.seconds",
DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S) * 1000
private val CLEAN_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.cleaner.interval", "1d")
private val logDir = conf.getOption("spark.history.fs.logDirectory")
.map { d => Utils.resolveURI(d).toString }
......@@ -130,8 +129,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
// A task that periodically cleans event logs on disk.
pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_MS,
TimeUnit.MILLISECONDS)
pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
}
}
}
......@@ -270,8 +268,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
try {
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
.getOrElse(Seq[FileStatus]())
val maxAge = conf.getLong("spark.history.fs.cleaner.maxAge.seconds",
DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000
val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000
val now = System.currentTimeMillis()
val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
......@@ -417,12 +414,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private object FsHistoryProvider {
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
// One day
val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds
// One week
val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds
}
private class FsApplicationHistoryInfo(
......
......@@ -217,6 +217,9 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
val count = conf.getAll.filter { case (k, v) => k.startsWith("spark.history.") }.size
assert(count === 4)
conf.set("spark.yarn.applicationMaster.waitTries", "42")
assert(conf.getTimeAsSeconds("spark.yarn.am.waitTime") === 420)
}
}
......
......@@ -153,19 +153,18 @@ follows:
</td>
</tr>
<tr>
<td>spark.history.fs.cleaner.interval.seconds</td>
<td>86400</td>
<td>spark.history.fs.cleaner.interval</td>
<td>1d</td>
<td>
How often the job history cleaner checks for files to delete, in seconds. Defaults to 86400 (one day).
Files are only deleted if they are older than spark.history.fs.cleaner.maxAge.seconds.
How often the job history cleaner checks for files to delete.
Files are only deleted if they are older than spark.history.fs.cleaner.maxAge.
</td>
</tr>
<tr>
<td>spark.history.fs.cleaner.maxAge.seconds</td>
<td>3600 * 24 * 7</td>
<td>spark.history.fs.cleaner.maxAge</td>
<td>7d</td>
<td>
Job history files older than this many seconds will be deleted when the history cleaner runs.
Defaults to 3600 * 24 * 7 (1 week).
Job history files older than this will be deleted when the history cleaner runs.
</td>
</tr>
</table>
......
......@@ -373,14 +373,7 @@ private[spark] class ApplicationMaster(
private def waitForSparkContextInitialized(): SparkContext = {
logInfo("Waiting for spark context initialization")
sparkContextRef.synchronized {
val waitTries = sparkConf.getOption("spark.yarn.applicationMaster.waitTries")
.map(_.toLong * 10000L)
if (waitTries.isDefined) {
logWarning(
"spark.yarn.applicationMaster.waitTries is deprecated, use spark.yarn.am.waitTime")
}
val totalWaitTime = sparkConf.getTimeAsMs("spark.yarn.am.waitTime",
s"${waitTries.getOrElse(100000L)}ms")
val totalWaitTime = sparkConf.getTimeAsMs("spark.yarn.am.waitTime", "100s")
val deadline = System.currentTimeMillis() + totalWaitTime
while (sparkContextRef.get() == null && System.currentTimeMillis < deadline && !finished) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment