From f13a33b477a3f9cc81f9decee736e7c50d8205e1 Mon Sep 17 00:00:00 2001 From: Burak Yavuz <brkyvz@gmail.com> Date: Tue, 15 Nov 2016 13:09:29 -0800 Subject: [PATCH] [SPARK-18337] Complete mode memory sinks should be able to recover from checkpoints ## What changes were proposed in this pull request? It would be nice if memory sinks can also recover from checkpoints. For correctness reasons, the only time we should support it is in `Complete` OutputMode. We can support this in CompleteMode, because the output of the StateStore is already persisted in the checkpoint directory. ## How was this patch tested? Unit test Author: Burak Yavuz <brkyvz@gmail.com> Closes #15801 from brkyvz/mem-stream. (cherry picked from commit 2afdaa9805f44b45242978eab9a9623d31dddbf3) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com> --- .../sql/streaming/DataStreamWriter.scala | 6 +- .../test/DataStreamReaderWriterSuite.scala | 65 +++++++++++++++++++ 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index b959444b49..daed1dcb77 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -222,14 +222,16 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { val sink = new MemorySink(df.schema, outputMode) val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink)) + val chkpointLoc = extraOptions.get("checkpointLocation") + val recoverFromChkpoint = chkpointLoc.isDefined && outputMode == OutputMode.Complete() val query = df.sparkSession.sessionState.streamingQueryManager.startQuery( extraOptions.get("queryName"), - extraOptions.get("checkpointLocation"), + chkpointLoc, df, sink, outputMode, useTempCheckpointLocation = true, - recoverFromCheckpointLocation = false, + recoverFromCheckpointLocation = recoverFromChkpoint, trigger = trigger) resultDf.createOrReplaceTempView(query.name) query diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index f099439581..5630464f40 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.streaming.test +import java.io.File import java.util.concurrent.TimeUnit import scala.concurrent.duration._ @@ -467,4 +468,68 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { val sq = df.writeStream.format("console").start() sq.stop() } + + test("MemorySink can recover from a checkpoint in Complete Mode") { + import testImplicits._ + val ms = new MemoryStream[Int](0, sqlContext) + val df = ms.toDF().toDF("a") + val checkpointLoc = newMetadataDir + val checkpointDir = new File(checkpointLoc, "offsets") + checkpointDir.mkdirs() + assert(checkpointDir.exists()) + val tableName = "test" + def startQuery: StreamingQuery = { + df.groupBy("a") + .count() + .writeStream + .format("memory") + .queryName(tableName) + .option("checkpointLocation", checkpointLoc) + .outputMode("complete") + .start() + } + // no exception here + val q = startQuery + ms.addData(0, 1) + q.processAllAvailable() + q.stop() + + checkAnswer( + spark.table(tableName), + Seq(Row(0, 1), Row(1, 1)) + ) + spark.sql(s"drop table $tableName") + // verify table is dropped + intercept[AnalysisException](spark.table(tableName).collect()) + val q2 = startQuery + ms.addData(0) + q2.processAllAvailable() + checkAnswer( + spark.table(tableName), + Seq(Row(0, 2), Row(1, 1)) + ) + + q2.stop() + } + + test("append mode memory sink's do not support checkpoint recovery") { + import testImplicits._ + val ms = new MemoryStream[Int](0, sqlContext) + val df = ms.toDF().toDF("a") + val checkpointLoc = newMetadataDir + val checkpointDir = new File(checkpointLoc, "offsets") + checkpointDir.mkdirs() + assert(checkpointDir.exists()) + + val e = intercept[AnalysisException] { + df.writeStream + .format("memory") + .queryName("test") + .option("checkpointLocation", checkpointLoc) + .outputMode("append") + .start() + } + assert(e.getMessage.contains("does not support recovering")) + assert(e.getMessage.contains("checkpoint location")) + } } -- GitLab