diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 0b1102686319922d8f2b333bb2817ddb815bc5ad..398fa6500f093c605323acc2ac518febfda0ba86 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -18,8 +18,8 @@
 package org.apache.spark.streaming
 
 import java.io._
-import java.util.concurrent.Executors
-import java.util.concurrent.RejectedExecutionException
+import java.util.concurrent.{ArrayBlockingQueue, RejectedExecutionException,
+  ThreadPoolExecutor, TimeUnit}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
@@ -184,7 +184,14 @@ class CheckpointWriter(
     hadoopConf: Configuration
   ) extends Logging {
   val MAX_ATTEMPTS = 3
-  val executor = Executors.newFixedThreadPool(1)
+
+  // Single-thread executor which rejects executions when a large amount have queued up.
+  // This fails fast since this typically means the checkpoint store will never keep up, and
+  // will otherwise lead to filling memory with waiting payloads of byte[] to write.
+  val executor = new ThreadPoolExecutor(
+    1, 1,
+    0L, TimeUnit.MILLISECONDS,
+    new ArrayBlockingQueue[Runnable](1000))
   val compressionCodec = CompressionCodec.createCodec(conf)
   private var stopped = false
   @volatile private[this] var fs: FileSystem = null