From 3350ad0d7fc3ecece78f87d7aa6a727e48b21c8c Mon Sep 17 00:00:00 2001
From: Shivaram Venkataraman <shivaram@eecs.berkeley.edu>
Date: Sun, 7 Jul 2013 03:32:26 -0700
Subject: [PATCH] Catch RejectedExecution exception in Checkpoint handler.

---
 .../src/main/scala/spark/streaming/Checkpoint.scala      | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index 66e67cbfa1..450e48d66e 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -8,7 +8,7 @@ import org.apache.hadoop.conf.Configuration
 import java.io._
 import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
 import java.util.concurrent.Executors
-
+import java.util.concurrent.RejectedExecutionException
 
 private[streaming]
 class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
@@ -91,7 +91,12 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
     oos.writeObject(checkpoint)
     oos.close()
     bos.close()
-    executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
+    try {
+      executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
+    } catch {
+      case rej: RejectedExecutionException =>
+        logError("Could not submit checkpoint task to the thread pool executor", rej)
+    }
   }
 
   def stop() {
-- 
GitLab