From 7add4e982184c46990995dcd1326e6caad7adf6e Mon Sep 17 00:00:00 2001
From: Marco Gaido <mgaido@hortonworks.com>
Date: Wed, 16 Aug 2017 09:40:04 -0700
Subject: [PATCH] [SPARK-21738] Thriftserver doesn't cancel jobs when session
 is closed

## What changes were proposed in this pull request?

When a session is closed the Thriftserver doesn't cancel the jobs which may still be running. This is a huge waste of resources.
This PR address the problem canceling the pending jobs when a session is closed.

## How was this patch tested?

The patch was tested manually.

Author: Marco Gaido <mgaido@hortonworks.com>

Closes #18951 from mgaido91/SPARK-21738.
---
 .../thriftserver/SparkExecuteStatementOperation.scala     | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 1d1074a2a7..f5191fa913 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -71,9 +71,9 @@ private[hive] class SparkExecuteStatementOperation(
 
   def close(): Unit = {
     // RDDs will be cleaned automatically upon garbage collection.
-    sqlContext.sparkContext.clearJobGroup()
     logDebug(s"CLOSING $statementId")
     cleanup(OperationState.CLOSED)
+    sqlContext.sparkContext.clearJobGroup()
   }
 
   def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int) {
@@ -273,9 +273,6 @@ private[hive] class SparkExecuteStatementOperation(
 
   override def cancel(): Unit = {
     logInfo(s"Cancel '$statement' with $statementId")
-    if (statementId != null) {
-      sqlContext.sparkContext.cancelJobGroup(statementId)
-    }
     cleanup(OperationState.CANCELED)
   }
 
@@ -287,6 +284,9 @@ private[hive] class SparkExecuteStatementOperation(
         backgroundHandle.cancel(true)
       }
     }
+    if (statementId != null) {
+      sqlContext.sparkContext.cancelJobGroup(statementId)
+    }
   }
 }
 
-- 
GitLab