From 645cf3fcc21987417b2946bdeeeb60af3edf667e Mon Sep 17 00:00:00 2001
From: Tathagata Das <tathagata.das1565@gmail.com>
Date: Thu, 19 Mar 2015 02:15:50 -0400
Subject: [PATCH] [SPARK-6222][Streaming] Dont delete checkpoint data when
 doing pre-batch-start checkpoint

This is another alternative approach to https://github.com/apache/spark/pull/4964/
I think this is a simpler fix that can be backported easily to other branches (1.2 and 1.3).

All it does it introduce a flag so that the pre-batch-start checkpoint does not call clear checkpoint.

There is not unit test yet. I will add it when this approach is commented upon. Not sure if this is testable easily.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #5008 from tdas/SPARK-6222 and squashes the following commits:

7315bc2 [Tathagata Das] Removed empty line.
c438de4 [Tathagata Das] Revert unnecessary change.
5e98374 [Tathagata Das] Added unit test
50cb60b [Tathagata Das] Fixed style issue
295ca5c [Tathagata Das] Fixing SPARK-6222
---
 .../apache/spark/streaming/Checkpoint.scala   |  12 +-
 .../streaming/scheduler/JobGenerator.scala    |  20 +--
 .../scheduler/JobGeneratorSuite.scala         | 133 ++++++++++++++++++
 3 files changed, 153 insertions(+), 12 deletions(-)
 create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala

diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index cb4c94fb9d..db64e11e16 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -119,7 +119,10 @@ class CheckpointWriter(
   private var stopped = false
   private var fs_ : FileSystem = _
 
-  class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
+  class CheckpointWriteHandler(
+      checkpointTime: Time,
+      bytes: Array[Byte],
+      clearCheckpointDataLater: Boolean) extends Runnable {
     def run() {
       var attempts = 0
       val startTime = System.currentTimeMillis()
@@ -166,7 +169,7 @@ class CheckpointWriter(
           val finishTime = System.currentTimeMillis()
           logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + checkpointFile +
             "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms")
-          jobGenerator.onCheckpointCompletion(checkpointTime)
+          jobGenerator.onCheckpointCompletion(checkpointTime, clearCheckpointDataLater)
           return
         } catch {
           case ioe: IOException =>
@@ -180,7 +183,7 @@ class CheckpointWriter(
     }
   }
 
-  def write(checkpoint: Checkpoint) {
+  def write(checkpoint: Checkpoint, clearCheckpointDataLater: Boolean) {
     val bos = new ByteArrayOutputStream()
     val zos = compressionCodec.compressedOutputStream(bos)
     val oos = new ObjectOutputStream(zos)
@@ -188,7 +191,8 @@ class CheckpointWriter(
     oos.close()
     bos.close()
     try {
-      executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
+      executor.execute(new CheckpointWriteHandler(
+        checkpoint.checkpointTime, bos.toByteArray, clearCheckpointDataLater))
       logDebug("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue")
     } catch {
       case rej: RejectedExecutionException =>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index ac92774a38..59488dfb0f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -30,7 +30,8 @@ import org.apache.spark.util.{Clock, ManualClock}
 private[scheduler] sealed trait JobGeneratorEvent
 private[scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent
 private[scheduler] case class ClearMetadata(time: Time) extends JobGeneratorEvent
-private[scheduler] case class DoCheckpoint(time: Time) extends JobGeneratorEvent
+private[scheduler] case class DoCheckpoint(
+    time: Time, clearCheckpointDataLater: Boolean) extends JobGeneratorEvent
 private[scheduler] case class ClearCheckpointData(time: Time) extends JobGeneratorEvent
 
 /**
@@ -163,8 +164,10 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
   /**
    * Callback called when the checkpoint of a batch has been written.
    */
-  def onCheckpointCompletion(time: Time) {
-    eventActor ! ClearCheckpointData(time)
+  def onCheckpointCompletion(time: Time, clearCheckpointDataLater: Boolean) {
+    if (clearCheckpointDataLater) {
+      eventActor ! ClearCheckpointData(time)
+    }
   }
 
   /** Processes all events */
@@ -173,7 +176,8 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
     event match {
       case GenerateJobs(time) => generateJobs(time)
       case ClearMetadata(time) => clearMetadata(time)
-      case DoCheckpoint(time) => doCheckpoint(time)
+      case DoCheckpoint(time, clearCheckpointDataLater) =>
+        doCheckpoint(time, clearCheckpointDataLater)
       case ClearCheckpointData(time) => clearCheckpointData(time)
     }
   }
@@ -245,7 +249,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
       case Failure(e) =>
         jobScheduler.reportError("Error generating jobs for time " + time, e)
     }
-    eventActor ! DoCheckpoint(time)
+    eventActor ! DoCheckpoint(time, clearCheckpointDataLater = false)
   }
 
   /** Clear DStream metadata for the given `time`. */
@@ -255,7 +259,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
     // If checkpointing is enabled, then checkpoint,
     // else mark batch to be fully processed
     if (shouldCheckpoint) {
-      eventActor ! DoCheckpoint(time)
+      eventActor ! DoCheckpoint(time, clearCheckpointDataLater = true)
     } else {
       // If checkpointing is not enabled, then delete metadata information about
       // received blocks (block data not saved in any case). Otherwise, wait for
@@ -278,11 +282,11 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
   }
 
   /** Perform checkpoint for the give `time`. */
-  private def doCheckpoint(time: Time) {
+  private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
     if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
       logInfo("Checkpointing graph for time " + time)
       ssc.graph.updateCheckpointData(time)
-      checkpointWriter.write(new Checkpoint(ssc, time))
+      checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
     }
   }
 
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
new file mode 100644
index 0000000000..4150b60635
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import java.util.concurrent.CountDownLatch
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming._
+import org.apache.spark.util.{ManualClock, Utils}
+
+class JobGeneratorSuite extends TestSuiteBase {
+
+  // SPARK-6222 is a tricky regression bug which causes received block metadata
+  // to be deleted before the corresponding batch has completed. This occurs when
+  // the following conditions are met.
+  // 1. streaming checkpointing is enabled by setting streamingContext.checkpoint(dir)
+  // 2. input data is received through a receiver as blocks
+  // 3. a batch processing a set of blocks takes a long time, such that a few subsequent
+  //    batches have been generated and submitted for processing.
+  //
+  // The JobGenerator (as of Mar 16, 2015) checkpoints twice per batch, once after generation
+  // of a batch, and another time after the completion of a batch. The cleanup of
+  // checkpoint data (including block metadata, etc.) from DStream must be done only after the
+  // 2nd checkpoint has completed, that is, after the batch has been completely processed.
+  // However, the issue is that the checkpoint data and along with it received block data is
+  // cleaned even in the case of the 1st checkpoint, causing pre-mature deletion of received block
+  // data. For example, if the 3rd batch is still being process, the 7th batch may get generated,
+  // and the corresponding "1st checkpoint" will delete received block metadata of batch older
+  // than 6th batch. That, is 3rd batch's block metadata gets deleted even before 3rd batch has
+  // been completely processed.
+  //
+  // This test tries to create that scenario by the following.
+  // 1. enable checkpointing
+  // 2. generate batches with received blocks
+  // 3. make the 3rd batch never complete
+  // 4. allow subsequent batches to be generated (to allow premature deletion of 3rd batch metadata)
+  // 5. verify whether 3rd batch's block metadata still exists
+  //
+  test("SPARK-6222: Do not clear received block data too soon") {
+    import JobGeneratorSuite._
+    val checkpointDir = Utils.createTempDir()
+    val testConf = conf
+    testConf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+    testConf.set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1")
+
+    withStreamingContext(new StreamingContext(testConf, batchDuration)) { ssc =>
+      val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+      val numBatches = 10
+      val longBatchNumber = 3 // 3rd batch will take a long time
+      val longBatchTime = longBatchNumber * batchDuration.milliseconds
+
+      val testTimeout = timeout(10 seconds)
+      val inputStream = ssc.receiverStream(new TestReceiver)
+
+      inputStream.foreachRDD((rdd: RDD[Int], time: Time) => {
+        if (time.milliseconds == longBatchTime) {
+          while (waitLatch.getCount() > 0) {
+            waitLatch.await()
+            println("Await over")
+          }
+        }
+      })
+
+      val batchCounter = new BatchCounter(ssc)
+      ssc.checkpoint(checkpointDir.getAbsolutePath)
+      ssc.start()
+
+      // Make sure the only 1 batch of information is to be remembered
+      assert(inputStream.rememberDuration === batchDuration)
+      val receiverTracker = ssc.scheduler.receiverTracker
+
+      // Get the blocks belonging to a batch
+      def getBlocksOfBatch(batchTime: Long) = {
+        receiverTracker.getBlocksOfBatchAndStream(Time(batchTime), inputStream.id)
+      }
+
+      // Wait for new blocks to be received
+      def waitForNewReceivedBlocks() {
+        eventually(testTimeout) {
+          assert(receiverTracker.hasUnallocatedBlocks)
+        }
+      }
+
+      // Wait for received blocks to be allocated to a batch
+      def waitForBlocksToBeAllocatedToBatch(batchTime: Long) {
+        eventually(testTimeout) {
+          assert(getBlocksOfBatch(batchTime).nonEmpty)
+        }
+      }
+
+      // Generate a large number of batches with blocks in them
+      for (batchNum <- 1 to numBatches) {
+        waitForNewReceivedBlocks()
+        clock.advance(batchDuration.milliseconds)
+        waitForBlocksToBeAllocatedToBatch(clock.getTimeMillis())
+      }
+
+      // Wait for 3rd batch to start
+      eventually(testTimeout) {
+        ssc.scheduler.getPendingTimes().contains(Time(numBatches * batchDuration.milliseconds))
+      }
+
+      // Verify that the 3rd batch's block data is still present while the 3rd batch is incomplete
+      assert(getBlocksOfBatch(longBatchTime).nonEmpty, "blocks of incomplete batch already deleted")
+      assert(batchCounter.getNumCompletedBatches < longBatchNumber)
+      waitLatch.countDown()
+    }
+  }
+}
+
+object JobGeneratorSuite {
+  val waitLatch = new CountDownLatch(1)
+}
-- 
GitLab