Skip to content
Snippets Groups Projects
Commit 50cf8c8b authored by Charles Reiss's avatar Charles Reiss
Browse files

Add fault tolerance test that uses replicated RDDs.

parent c8a78869
No related branches found
No related tags found
No related merge requests found
......@@ -217,6 +217,27 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
assert(grouped.collect.size === 1)
}
}
test("recover from node failures with replication") {
import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
DistributedSuite.amMaster = true
// Using more than two nodes so we don't have a symmetric communication pattern and might
// cache a partially correct list of peers.
sc = new SparkContext("local-cluster[3,1,512]", "test")
for (i <- 1 to 3) {
val data = sc.parallelize(Seq(true, false, false, false), 4)
data.persist(StorageLevel.MEMORY_ONLY_2)
assert(data.count === 4)
assert(data.map(markNodeIfIdentity).collect.size === 4)
assert(data.map(failOnMarkedIdentity).collect.size === 4)
// Create a new replicated RDD to make sure that cached peer information doesn't cause
// problems.
val data2 = sc.parallelize(Seq(true, true), 2).persist(StorageLevel.MEMORY_ONLY_2)
assert(data2.count === 2)
}
}
}
object DistributedSuite {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment