From 1903641e68ce7e7e657584bf45e91db6df357e41 Mon Sep 17 00:00:00 2001
From: huangzhaowei <carlmartinmax@gmail.com>
Date: Thu, 9 Jul 2015 19:31:31 -0700
Subject: [PATCH] [SPARK-8839] [SQL] ThriftServer2 will remove session and
 execution no matter it's finished or not.

In my test, `sessions` and `executions` in ThriftServer2 is not the same number as the connection number.
For example, if there are 200 clients connecting to the server,  but it will have more than 200 `sessions` and `executions`.
So if it reaches the `retainedStatements`, it has to remove some object which is not finished.
So it may cause the exception described in [Jira Address](https://issues.apache.org/jira/browse/SPARK-8839)

Author: huangzhaowei <carlmartinmax@gmail.com>

Closes #7239 from SaintBacchus/SPARK-8839 and squashes the following commits:

cf7ef40 [huangzhaowei] Remove the a meanless funciton call
3e9a5a6 [huangzhaowei] Add a filter before take
9d5ceb8 [huangzhaowei] [SPARK-8839][SQL]ThriftServer2 will remove session and execution no matter it's finished or not.
---
 .../spark/sql/hive/thriftserver/HiveThriftServer2.scala    | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index 700d994bb6..b7db80d93f 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -179,6 +179,7 @@ object HiveThriftServer2 extends Logging {
     def onSessionClosed(sessionId: String): Unit = {
       sessionList(sessionId).finishTimestamp = System.currentTimeMillis
       onlineSessionNum -= 1
+      trimSessionIfNecessary()
     }
 
     def onStatementStart(
@@ -206,18 +207,20 @@ object HiveThriftServer2 extends Logging {
       executionList(id).detail = errorMessage
       executionList(id).state = ExecutionState.FAILED
       totalRunning -= 1
+      trimExecutionIfNecessary()
     }
 
     def onStatementFinish(id: String): Unit = {
       executionList(id).finishTimestamp = System.currentTimeMillis
       executionList(id).state = ExecutionState.FINISHED
       totalRunning -= 1
+      trimExecutionIfNecessary()
     }
 
     private def trimExecutionIfNecessary() = synchronized {
       if (executionList.size > retainedStatements) {
         val toRemove = math.max(retainedStatements / 10, 1)
-        executionList.take(toRemove).foreach { s =>
+        executionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s =>
           executionList.remove(s._1)
         }
       }
@@ -226,7 +229,7 @@ object HiveThriftServer2 extends Logging {
     private def trimSessionIfNecessary() = synchronized {
       if (sessionList.size > retainedSessions) {
         val toRemove = math.max(retainedSessions / 10, 1)
-        sessionList.take(toRemove).foreach { s =>
+        sessionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s =>
           sessionList.remove(s._1)
         }
       }
-- 
GitLab