Skip to content
Snippets Groups Projects
Commit 7ec83658 authored by Jose Torres's avatar Jose Torres Committed by Tathagata Das
Browse files

[SPARK-23491][SS] Remove explicit job cancellation from ContinuousExecution reconfiguring

## What changes were proposed in this pull request?

Remove queryExecutionThread.interrupt() from ContinuousExecution. As detailed in the JIRA, interrupting the thread is only relevant in the microbatch case; for continuous processing the query execution can quickly clean itself up without.

## How was this patch tested?

existing tests

Author: Jose Torres <jose@databricks.com>

Closes #20622 from jose-torres/SPARK-23441.
parent 185f5bc7
No related branches found
No related tags found
No related merge requests found
......@@ -236,9 +236,7 @@ class ContinuousExecution(
startTrigger()
if (reader.needsReconfiguration() && state.compareAndSet(ACTIVE, RECONFIGURING)) {
stopSources()
if (queryExecutionThread.isAlive) {
sparkSession.sparkContext.cancelJobGroup(runId.toString)
queryExecutionThread.interrupt()
}
false
......@@ -266,12 +264,20 @@ class ContinuousExecution(
SQLExecution.withNewExecutionId(
sparkSessionForQuery, lastExecution)(lastExecution.toRdd)
}
} catch {
case t: Throwable
if StreamExecution.isInterruptionException(t) && state.get() == RECONFIGURING =>
logInfo(s"Query $id ignoring exception from reconfiguring: $t")
// interrupted by reconfiguration - swallow exception so we can restart the query
} finally {
epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)
SparkEnv.get.rpcEnv.stop(epochEndpoint)
epochUpdateThread.interrupt()
epochUpdateThread.join()
stopSources()
sparkSession.sparkContext.cancelJobGroup(runId.toString)
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment