Skip to content
Snippets Groups Projects
Commit 162bdb91 authored by Shixiong Zhu's avatar Shixiong Zhu Committed by Tathagata Das
Browse files

[SPARK-18031][TESTS] Fix flaky test ExecutorAllocationManagerSuite.basic functionality


## What changes were proposed in this pull request?

The failure is because in `test("basic functionality")`, it doesn't block until `ExecutorAllocationManager.manageAllocation` is called. This PR just adds StreamManualClock to allow the tests to block on expected wait time to make the test deterministic.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16321 from zsxwing/SPARK-18031.

(cherry picked from commit ccfe60a8)
Signed-off-by: default avatarTathagata Das <tathagata.das1565@gmail.com>
parent 3c8861d9
No related branches found
No related tags found
No related merge requests found
......@@ -36,11 +36,11 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
private val batchDurationMillis = 1000L
private var allocationClient: ExecutorAllocationClient = null
private var clock: ManualClock = null
private var clock: StreamManualClock = null
before {
allocationClient = mock[ExecutorAllocationClient]
clock = new ManualClock()
clock = new StreamManualClock()
}
test("basic functionality") {
......@@ -57,10 +57,14 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
reset(allocationClient)
when(allocationClient.getExecutorIds()).thenReturn(Seq("1", "2"))
addBatchProcTime(allocationManager, batchProcTimeMs.toLong)
clock.advance(SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1)
val advancedTime = SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1
val expectedWaitTime = clock.getTimeMillis() + advancedTime
clock.advance(advancedTime)
// Make sure ExecutorAllocationManager.manageAllocation is called
eventually(timeout(10 seconds)) {
body
assert(clock.isStreamWaitingAt(expectedWaitTime))
}
body
}
/** Verify that the expected number of total executor were requested */
......@@ -394,3 +398,27 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
}
}
}
/**
* A special manual clock that provide `isStreamWaitingAt` to allow the user to check if the clock
* is blocking.
*/
class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable {
private var waitStartTime: Option[Long] = None
override def waitTillTime(targetTime: Long): Long = synchronized {
try {
waitStartTime = Some(getTimeMillis())
super.waitTillTime(targetTime)
} finally {
waitStartTime = None
}
}
/**
* Returns if the clock is blocking and the time it started to block is the parameter `time`.
*/
def isStreamWaitingAt(time: Long): Boolean = synchronized {
waitStartTime == Some(time)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment