Skip to content
Snippets Groups Projects
Commit c5a7cb02 authored by Shixiong Zhu's avatar Shixiong Zhu Committed by Burak Yavuz
Browse files

[SPARK-19542][SS] Delete the temp checkpoint if a query is stopped without errors


## What changes were proposed in this pull request?

When a query uses a temp checkpoint dir, it's better to delete it if it's stopped without errors.

## How was this patch tested?

New unit tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16880 from zsxwing/delete-temp-checkpoint.

(cherry picked from commit 3dbff9be)
Signed-off-by: default avatarBurak Yavuz <brkyvz@gmail.com>
parent ef4fb7eb
No related branches found
No related tags found
No related merge requests found
......@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.streaming
import java.io.IOException
import java.util.UUID
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.locks.ReentrantLock
......@@ -41,16 +42,20 @@ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
* Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any
* [[Source]] present in the query plan. Whenever new data arrives, a [[QueryExecution]] is created
* and the results are committed transactionally to the given [[Sink]].
*
* @param deleteCheckpointOnStop whether to delete the checkpoint if the query is stopped without
* errors
*/
class StreamExecution(
override val sparkSession: SparkSession,
override val name: String,
checkpointRoot: String,
val checkpointRoot: String,
analyzedPlan: LogicalPlan,
val sink: Sink,
val trigger: Trigger,
val triggerClock: Clock,
val outputMode: OutputMode)
val outputMode: OutputMode,
deleteCheckpointOnStop: Boolean)
extends StreamingQuery with ProgressReporter with Logging {
import org.apache.spark.sql.streaming.StreamingQueryListener._
......@@ -213,6 +218,7 @@ class StreamExecution(
* has been posted to all the listeners.
*/
def start(): Unit = {
logInfo(s"Starting $prettyIdString. Use $checkpointRoot to store the query checkpoint.")
microBatchThread.setDaemon(true)
microBatchThread.start()
startLatch.await() // Wait until thread started and QueryStart event has been posted
......@@ -323,6 +329,20 @@ class StreamExecution(
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
postEvent(
new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString)))
// Delete the temp checkpoint only when the query didn't fail
if (deleteCheckpointOnStop && exception.isEmpty) {
val checkpointPath = new Path(checkpointRoot)
try {
val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
fs.delete(checkpointPath, true)
} catch {
case NonFatal(e) =>
// Deleting temp checkpoint folder is best effort, don't throw non fatal exceptions
// when we cannot delete them.
logWarning(s"Cannot delete $checkpointPath", e)
}
}
} finally {
terminationLatch.countDown()
}
......
......@@ -195,6 +195,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
recoverFromCheckpointLocation: Boolean,
trigger: Trigger,
triggerClock: Clock): StreamingQueryWrapper = {
var deleteCheckpointOnStop = false
val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
new Path(userSpecified).toUri.toString
}.orElse {
......@@ -203,6 +204,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
}
}.getOrElse {
if (useTempCheckpointLocation) {
// Delete the temp checkpoint when a query is being stopped without errors.
deleteCheckpointOnStop = true
Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
} else {
throw new AnalysisException(
......@@ -244,7 +247,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
sink,
trigger,
triggerClock,
outputMode))
outputMode,
deleteCheckpointOnStop))
}
/**
......
......@@ -669,4 +669,30 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter with Pr
}
}
}
test("temp checkpoint dir should be deleted if a query is stopped without errors") {
import testImplicits._
val query = MemoryStream[Int].toDS.writeStream.format("console").start()
val checkpointDir = new Path(
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.checkpointRoot)
val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf())
assert(fs.exists(checkpointDir))
query.stop()
assert(!fs.exists(checkpointDir))
}
testQuietly("temp checkpoint dir should not be deleted if a query is stopped with an error") {
import testImplicits._
val input = MemoryStream[Int]
val query = input.toDS.map(_ / 0).writeStream.format("console").start()
val checkpointDir = new Path(
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.checkpointRoot)
val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf())
assert(fs.exists(checkpointDir))
input.addData(1)
intercept[StreamingQueryException] {
query.awaitTermination()
}
assert(fs.exists(checkpointDir))
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment