diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
index be734b80272d1b4f1db0937dd1e5babcdce8fbda..c4a44c1822c39a6f31e66a3845d76ffa0b53c48a 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
@@ -201,12 +201,31 @@ class ReliableKafkaReceiver[
     topicPartitionOffsetMap.clear()
   }
 
-  /** Store the ready-to-be-stored block and commit the related offsets to zookeeper. */
+  /**
+   * Store the ready-to-be-stored block and commit the related offsets to zookeeper. This method
+   * will try a fixed number of times to push the block. If the push fails, the receiver is stopped.
+   */
   private def storeBlockAndCommitOffset(
       blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
-    store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
-    Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
-    blockOffsetMap.remove(blockId)
+    var count = 0
+    var pushed = false
+    var exception: Exception = null
+    while (!pushed && count <= 3) {
+      try {
+        store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
+        pushed = true
+      } catch {
+        case ex: Exception =>
+          count += 1
+          exception = ex
+      }
+    }
+    if (pushed) {
+      Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
+      blockOffsetMap.remove(blockId)
+    } else {
+      stop("Error while storing block into Spark", exception)
+    }
   }
 
   /**