Skip to content
Snippets Groups Projects
Commit a5d98876 authored by Burak Yavuz's avatar Burak Yavuz Committed by Shixiong Zhu
Browse files

[STREAMING][FLAKY-TEST] Catch execution context race condition in `FileBasedWriteAheadLog.close()`

There is a race condition in `FileBasedWriteAheadLog.close()`, where if delete's of old log files are in progress, the write ahead log may close, and result in a `RejectedExecutionException`. This is okay, and should be handled gracefully.

Example test failures:
https://amplab.cs.berkeley.edu/jenkins/job/Spark-1.6-SBT/AMPLAB_JENKINS_BUILD_PROFILE=hadoop1.0,label=spark-test/95/testReport/junit/org.apache.spark.streaming.util/BatchedWriteAheadLogWithCloseFileAfterWriteSuite/BatchedWriteAheadLog___clean_old_logs/

The reason the test fails is in `afterEach`, `writeAheadLog.close` is called, and there may still be async deletes in flight.

tdas zsxwing

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #9953 from brkyvz/flaky-ss.
parent 4d6bbbc0
No related branches found
No related tags found
No related merge requests found
......@@ -17,7 +17,7 @@
package org.apache.spark.streaming.util
import java.nio.ByteBuffer
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.{RejectedExecutionException, ThreadPoolExecutor}
import java.util.{Iterator => JIterator}
import scala.collection.JavaConverters._
......@@ -176,10 +176,16 @@ private[streaming] class FileBasedWriteAheadLog(
}
oldLogFiles.foreach { logInfo =>
if (!executionContext.isShutdown) {
val f = Future { deleteFile(logInfo) }(executionContext)
if (waitForCompletion) {
import scala.concurrent.duration._
Await.ready(f, 1 second)
try {
val f = Future { deleteFile(logInfo) }(executionContext)
if (waitForCompletion) {
import scala.concurrent.duration._
Await.ready(f, 1 second)
}
} catch {
case e: RejectedExecutionException =>
logWarning("Execution context shutdown before deleting old WriteAheadLogs. " +
"This would not affect recovery correctness.", e)
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment