Skip to content
Snippets Groups Projects
Commit 3350ad0d authored by Shivaram Venkataraman's avatar Shivaram Venkataraman
Browse files

Catch RejectedExecution exception in Checkpoint handler.

parent 7d6d9e6a
No related branches found
No related tags found
No related merge requests found
......@@ -8,7 +8,7 @@ import org.apache.hadoop.conf.Configuration
import java.io._
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import java.util.concurrent.Executors
import java.util.concurrent.RejectedExecutionException
private[streaming]
class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
......@@ -91,7 +91,12 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
oos.writeObject(checkpoint)
oos.close()
bos.close()
executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
try {
executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
} catch {
case rej: RejectedExecutionException =>
logError("Could not submit checkpoint task to the thread pool executor", rej)
}
}
def stop() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment