Skip to content
Snippets Groups Projects
Commit 0424da68 authored by GuoQiang Li's avatar GuoQiang Li Committed by Sean Owen
Browse files

[SPARK-6963][CORE]Flaky test: o.a.s.ContextCleanerSuite automatically cleanup checkpoint

cc andrewor14

Author: GuoQiang Li <witgo@qq.com>

Closes #5548 from witgo/SPARK-6963 and squashes the following commits:

964aea7 [GuoQiang Li] review commits
b08b3c9 [GuoQiang Li] Flaky test: o.a.s.ContextCleanerSuite automatically cleanup checkpoint
parent 8fbd45c7
No related branches found
No related tags found
No related merge requests found
......@@ -236,6 +236,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
try {
logDebug("Cleaning rdd checkpoint data " + rddId)
RDDCheckpointData.clearRDDCheckpointData(sc, rddId)
listeners.foreach(_.checkpointCleaned(rddId))
logInfo("Cleaned rdd checkpoint data " + rddId)
}
catch {
......@@ -260,4 +261,5 @@ private[spark] trait CleanerListener {
def shuffleCleaned(shuffleId: Int)
def broadcastCleaned(broadcastId: Long)
def accumCleaned(accId: Long)
def checkpointCleaned(rddId: Long)
}
......@@ -224,7 +224,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
assert(fs.exists(path))
// the checkpoint is not cleaned by default (without the configuration set)
var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil)
var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Nil)
rdd = null // Make RDD out of scope
runGC()
postGCTester.assertCleanup()
......@@ -245,7 +245,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
assert(fs.exists(RDDCheckpointData.rddCheckpointDataPath(sc, rddId).get))
// Test that GC causes checkpoint data cleanup after dereferencing the RDD
postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil)
postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Seq(rddId))
rdd = null // Make RDD out of scope
runGC()
postGCTester.assertCleanup()
......@@ -406,12 +406,14 @@ class CleanerTester(
sc: SparkContext,
rddIds: Seq[Int] = Seq.empty,
shuffleIds: Seq[Int] = Seq.empty,
broadcastIds: Seq[Long] = Seq.empty)
broadcastIds: Seq[Long] = Seq.empty,
checkpointIds: Seq[Long] = Seq.empty)
extends Logging {
val toBeCleanedRDDIds = new HashSet[Int] with SynchronizedSet[Int] ++= rddIds
val toBeCleanedShuffleIds = new HashSet[Int] with SynchronizedSet[Int] ++= shuffleIds
val toBeCleanedBroadcstIds = new HashSet[Long] with SynchronizedSet[Long] ++= broadcastIds
val toBeCheckpointIds = new HashSet[Long] with SynchronizedSet[Long] ++= checkpointIds
val isDistributed = !sc.isLocal
val cleanerListener = new CleanerListener {
......@@ -427,12 +429,17 @@ class CleanerTester(
def broadcastCleaned(broadcastId: Long): Unit = {
toBeCleanedBroadcstIds -= broadcastId
logInfo("Broadcast" + broadcastId + " cleaned")
logInfo("Broadcast " + broadcastId + " cleaned")
}
def accumCleaned(accId: Long): Unit = {
logInfo("Cleaned accId " + accId + " cleaned")
}
def checkpointCleaned(rddId: Long): Unit = {
toBeCheckpointIds -= rddId
logInfo("checkpoint " + rddId + " cleaned")
}
}
val MAX_VALIDATION_ATTEMPTS = 10
......@@ -456,7 +463,8 @@ class CleanerTester(
/** Verify that RDDs, shuffles, etc. occupy resources */
private def preCleanupValidate() {
assert(rddIds.nonEmpty || shuffleIds.nonEmpty || broadcastIds.nonEmpty, "Nothing to cleanup")
assert(rddIds.nonEmpty || shuffleIds.nonEmpty || broadcastIds.nonEmpty ||
checkpointIds.nonEmpty, "Nothing to cleanup")
// Verify the RDDs have been persisted and blocks are present
rddIds.foreach { rddId =>
......@@ -547,7 +555,8 @@ class CleanerTester(
private def isAllCleanedUp =
toBeCleanedRDDIds.isEmpty &&
toBeCleanedShuffleIds.isEmpty &&
toBeCleanedBroadcstIds.isEmpty
toBeCleanedBroadcstIds.isEmpty &&
toBeCheckpointIds.isEmpty
private def getRDDBlocks(rddId: Int): Seq[BlockId] = {
blockManager.master.getMatchingBlockIds( _ match {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment