Skip to content
Snippets Groups Projects
Commit 063a98e5 authored by Burak Yavuz's avatar Burak Yavuz Committed by Tathagata Das
Browse files

[SPARK-18900][FLAKY-TEST] StateStoreSuite.maintenance

## What changes were proposed in this pull request?

It was pretty flaky before 10 days ago.
https://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.execution.streaming.state.StateStoreSuite&test_name=maintenance



Since no code changes went into this code path to not be so flaky, I'm just increasing the timeouts such that load related flakiness shouldn't be a problem. As you may see from the testing, I haven't been able to reproduce it.

## How was this patch tested?

2000 retries 5 times

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #16314 from brkyvz/maint-flaky.

(cherry picked from commit b2dd8ec6)
Signed-off-by: default avatarTathagata Das <tathagata.das1565@gmail.com>
parent 3857d5ba
No related branches found
No related tags found
No related merge requests found
......@@ -395,6 +395,8 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
}
}
val timeoutDuration = 60 seconds
quietly {
withSpark(new SparkContext(conf)) { sc =>
withCoordinatorRef(sc) { coordinatorRef =>
......@@ -403,7 +405,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
// Generate sufficient versions of store for snapshots
generateStoreVersions()
eventually(timeout(10 seconds)) {
eventually(timeout(timeoutDuration)) {
// Store should have been reported to the coordinator
assert(coordinatorRef.getLocation(storeId).nonEmpty, "active instance was not reported")
......@@ -422,14 +424,14 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
generateStoreVersions()
// Earliest delta file should get cleaned up
eventually(timeout(10 seconds)) {
eventually(timeout(timeoutDuration)) {
assert(!fileExists(provider, 1, isSnapshot = false), "earliest file not deleted")
}
// If driver decides to deactivate all instances of the store, then this instance
// should be unloaded
coordinatorRef.deactivateInstances(dir)
eventually(timeout(10 seconds)) {
eventually(timeout(timeoutDuration)) {
assert(!StateStore.isLoaded(storeId))
}
......@@ -439,7 +441,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
// If some other executor loads the store, then this instance should be unloaded
coordinatorRef.reportActiveInstance(storeId, "other-host", "other-exec")
eventually(timeout(10 seconds)) {
eventually(timeout(timeoutDuration)) {
assert(!StateStore.isLoaded(storeId))
}
......@@ -450,7 +452,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
}
// Verify if instance is unloaded if SparkContext is stopped
eventually(timeout(10 seconds)) {
eventually(timeout(timeoutDuration)) {
require(SparkEnv.get === null)
assert(!StateStore.isLoaded(storeId))
assert(!StateStore.isMaintenanceRunning)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment