Skip to content
Snippets Groups Projects
Commit f13a33b4 authored by Burak Yavuz's avatar Burak Yavuz Committed by Tathagata Das
Browse files

[SPARK-18337] Complete mode memory sinks should be able to recover from checkpoints


## What changes were proposed in this pull request?

It would be nice if memory sinks can also recover from checkpoints. For correctness reasons, the only time we should support it is in `Complete` OutputMode. We can support this in CompleteMode, because the output of the StateStore is already persisted in the checkpoint directory.

## How was this patch tested?

Unit test

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #15801 from brkyvz/mem-stream.

(cherry picked from commit 2afdaa98)
Signed-off-by: default avatarTathagata Das <tathagata.das1565@gmail.com>
parent 5f7a9af6
No related branches found
No related tags found
No related merge requests found
......@@ -222,14 +222,16 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
val sink = new MemorySink(df.schema, outputMode)
val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
val chkpointLoc = extraOptions.get("checkpointLocation")
val recoverFromChkpoint = chkpointLoc.isDefined && outputMode == OutputMode.Complete()
val query = df.sparkSession.sessionState.streamingQueryManager.startQuery(
extraOptions.get("queryName"),
extraOptions.get("checkpointLocation"),
chkpointLoc,
df,
sink,
outputMode,
useTempCheckpointLocation = true,
recoverFromCheckpointLocation = false,
recoverFromCheckpointLocation = recoverFromChkpoint,
trigger = trigger)
resultDf.createOrReplaceTempView(query.name)
query
......
......@@ -17,6 +17,7 @@
package org.apache.spark.sql.streaming.test
import java.io.File
import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
......@@ -467,4 +468,68 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
val sq = df.writeStream.format("console").start()
sq.stop()
}
test("MemorySink can recover from a checkpoint in Complete Mode") {
import testImplicits._
val ms = new MemoryStream[Int](0, sqlContext)
val df = ms.toDF().toDF("a")
val checkpointLoc = newMetadataDir
val checkpointDir = new File(checkpointLoc, "offsets")
checkpointDir.mkdirs()
assert(checkpointDir.exists())
val tableName = "test"
def startQuery: StreamingQuery = {
df.groupBy("a")
.count()
.writeStream
.format("memory")
.queryName(tableName)
.option("checkpointLocation", checkpointLoc)
.outputMode("complete")
.start()
}
// no exception here
val q = startQuery
ms.addData(0, 1)
q.processAllAvailable()
q.stop()
checkAnswer(
spark.table(tableName),
Seq(Row(0, 1), Row(1, 1))
)
spark.sql(s"drop table $tableName")
// verify table is dropped
intercept[AnalysisException](spark.table(tableName).collect())
val q2 = startQuery
ms.addData(0)
q2.processAllAvailable()
checkAnswer(
spark.table(tableName),
Seq(Row(0, 2), Row(1, 1))
)
q2.stop()
}
test("append mode memory sink's do not support checkpoint recovery") {
import testImplicits._
val ms = new MemoryStream[Int](0, sqlContext)
val df = ms.toDF().toDF("a")
val checkpointLoc = newMetadataDir
val checkpointDir = new File(checkpointLoc, "offsets")
checkpointDir.mkdirs()
assert(checkpointDir.exists())
val e = intercept[AnalysisException] {
df.writeStream
.format("memory")
.queryName("test")
.option("checkpointLocation", checkpointLoc)
.outputMode("append")
.start()
}
assert(e.getMessage.contains("does not support recovering"))
assert(e.getMessage.contains("checkpoint location"))
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment