Skip to content
Snippets Groups Projects
Commit 031d7d41 authored by jerryshao's avatar jerryshao Committed by Tathagata Das
Browse files

[SPARK-6304] [STREAMING] Fix checkpointing doesn't retain driver port issue.

Author: jerryshao <saisai.shao@intel.com>
Author: Saisai Shao <saisai.shao@intel.com>

Closes #5060 from jerryshao/SPARK-6304 and squashes the following commits:

89b01f5 [jerryshao] Update the unit test to add more cases
275d252 [jerryshao] Address the comments
7cc146d [jerryshao] Address the comments
2624723 [jerryshao] Fix rebase conflict
45befaa [Saisai Shao] Update the unit test
bbc1c9c [Saisai Shao] Fix checkpointing doesn't retain driver port issue
parent fec10f0c
No related branches found
No related tags found
No related merge requests found
......@@ -48,6 +48,8 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
// Reload properties for the checkpoint application since user wants to set a reload property
// or spark had changed its value and user wants to set it back.
val propertiesToReload = List(
"spark.driver.host",
"spark.driver.port",
"spark.master",
"spark.yarn.keytab",
"spark.yarn.principal")
......
......@@ -191,8 +191,51 @@ class CheckpointSuite extends TestSuiteBase {
}
}
// This tests if "spark.driver.host" and "spark.driver.port" is set by user, can be recovered
// with correct value.
test("get correct spark.driver.[host|port] from checkpoint") {
val conf = Map("spark.driver.host" -> "localhost", "spark.driver.port" -> "9999")
conf.foreach(kv => System.setProperty(kv._1, kv._2))
ssc = new StreamingContext(master, framework, batchDuration)
val originalConf = ssc.conf
assert(originalConf.get("spark.driver.host") === "localhost")
assert(originalConf.get("spark.driver.port") === "9999")
val cp = new Checkpoint(ssc, Time(1000))
ssc.stop()
// Serialize/deserialize to simulate write to storage and reading it back
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
val newCpConf = newCp.createSparkConf()
assert(newCpConf.contains("spark.driver.host"))
assert(newCpConf.contains("spark.driver.port"))
assert(newCpConf.get("spark.driver.host") === "localhost")
assert(newCpConf.get("spark.driver.port") === "9999")
// Check if all the parameters have been restored
ssc = new StreamingContext(null, newCp, null)
val restoredConf = ssc.conf
assert(restoredConf.get("spark.driver.host") === "localhost")
assert(restoredConf.get("spark.driver.port") === "9999")
ssc.stop()
// If spark.driver.host and spark.driver.host is not set in system property, these two
// parameters should not be presented in the newly recovered conf.
conf.foreach(kv => System.clearProperty(kv._1))
val newCpConf1 = newCp.createSparkConf()
assert(!newCpConf1.contains("spark.driver.host"))
assert(!newCpConf1.contains("spark.driver.port"))
// Spark itself will dispatch a random, not-used port for spark.driver.port if it is not set
// explicitly.
ssc = new StreamingContext(null, newCp, null)
val restoredConf1 = ssc.conf
assert(restoredConf1.get("spark.driver.host") === "localhost")
assert(restoredConf1.get("spark.driver.port") !== "9999")
}
// This tests whether the systm can recover from a master failure with simple
// This tests whether the system can recover from a master failure with simple
// non-stateful operations. This assumes as reliable, replayable input
// source - TestInputDStream.
test("recovery with map and reduceByKey operations") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment