diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index b3c600ae53dbbeaff7ebcb1d451a5d1da8865061..b7fc336223fd8fbeb7d83c1b21d9b87ca5d8dc2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -223,7 +223,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { val sink = new MemorySink(df.schema, outputMode) val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink)) val chkpointLoc = extraOptions.get("checkpointLocation") - val recoverFromChkpoint = chkpointLoc.isDefined && outputMode == OutputMode.Complete() + val recoverFromChkpoint = outputMode == OutputMode.Complete() val query = df.sparkSession.sessionState.streamingQueryManager.startQuery( extraOptions.get("queryName"), chkpointLoc, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index acac0bfb0e253ed1eb461b00052f4ae9a8aa7eda..9de3da34831c393863302693f2e0374623706dfd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -470,24 +470,22 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { sq.stop() } - test("MemorySink can recover from a checkpoint in Complete Mode") { + private def testMemorySinkCheckpointRecovery(chkLoc: String, provideInWriter: Boolean): Unit = { import testImplicits._ val ms = new MemoryStream[Int](0, sqlContext) val df = ms.toDF().toDF("a") - val checkpointLoc = newMetadataDir - val checkpointDir = new File(checkpointLoc, "offsets") - checkpointDir.mkdirs() - assert(checkpointDir.exists()) val tableName = "test" def startQuery: StreamingQuery = { - df.groupBy("a") + val writer = df.groupBy("a") .count() .writeStream .format("memory") .queryName(tableName) - .option("checkpointLocation", checkpointLoc) .outputMode("complete") - .start() + if (provideInWriter) { + writer.option("checkpointLocation", chkLoc) + } + writer.start() } // no exception here val q = startQuery @@ -513,6 +511,24 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { q2.stop() } + test("MemorySink can recover from a checkpoint in Complete Mode") { + val checkpointLoc = newMetadataDir + val checkpointDir = new File(checkpointLoc, "offsets") + checkpointDir.mkdirs() + assert(checkpointDir.exists()) + testMemorySinkCheckpointRecovery(checkpointLoc, provideInWriter = true) + } + + test("SPARK-18927: MemorySink can recover from a checkpoint provided in conf in Complete Mode") { + val checkpointLoc = newMetadataDir + val checkpointDir = new File(checkpointLoc, "offsets") + checkpointDir.mkdirs() + assert(checkpointDir.exists()) + withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointLoc) { + testMemorySinkCheckpointRecovery(checkpointLoc, provideInWriter = false) + } + } + test("append mode memory sink's do not support checkpoint recovery") { import testImplicits._ val ms = new MemoryStream[Int](0, sqlContext)