Skip to content
Snippets Groups Projects
Commit 1903641e authored by huangzhaowei's avatar huangzhaowei Committed by Cheng Lian
Browse files

[SPARK-8839] [SQL] ThriftServer2 will remove session and execution no matter it's finished or not.

In my test, `sessions` and `executions` in ThriftServer2 is not the same number as the connection number.
For example, if there are 200 clients connecting to the server,  but it will have more than 200 `sessions` and `executions`.
So if it reaches the `retainedStatements`, it has to remove some object which is not finished.
So it may cause the exception described in [Jira Address](https://issues.apache.org/jira/browse/SPARK-8839)

Author: huangzhaowei <carlmartinmax@gmail.com>

Closes #7239 from SaintBacchus/SPARK-8839 and squashes the following commits:

cf7ef40 [huangzhaowei] Remove the a meanless funciton call
3e9a5a6 [huangzhaowei] Add a filter before take
9d5ceb8 [huangzhaowei] [SPARK-8839][SQL]ThriftServer2 will remove session and execution no matter it's finished or not.
parent 27273046
No related branches found
No related tags found
No related merge requests found
...@@ -179,6 +179,7 @@ object HiveThriftServer2 extends Logging { ...@@ -179,6 +179,7 @@ object HiveThriftServer2 extends Logging {
def onSessionClosed(sessionId: String): Unit = { def onSessionClosed(sessionId: String): Unit = {
sessionList(sessionId).finishTimestamp = System.currentTimeMillis sessionList(sessionId).finishTimestamp = System.currentTimeMillis
onlineSessionNum -= 1 onlineSessionNum -= 1
trimSessionIfNecessary()
} }
def onStatementStart( def onStatementStart(
...@@ -206,18 +207,20 @@ object HiveThriftServer2 extends Logging { ...@@ -206,18 +207,20 @@ object HiveThriftServer2 extends Logging {
executionList(id).detail = errorMessage executionList(id).detail = errorMessage
executionList(id).state = ExecutionState.FAILED executionList(id).state = ExecutionState.FAILED
totalRunning -= 1 totalRunning -= 1
trimExecutionIfNecessary()
} }
def onStatementFinish(id: String): Unit = { def onStatementFinish(id: String): Unit = {
executionList(id).finishTimestamp = System.currentTimeMillis executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).state = ExecutionState.FINISHED executionList(id).state = ExecutionState.FINISHED
totalRunning -= 1 totalRunning -= 1
trimExecutionIfNecessary()
} }
private def trimExecutionIfNecessary() = synchronized { private def trimExecutionIfNecessary() = synchronized {
if (executionList.size > retainedStatements) { if (executionList.size > retainedStatements) {
val toRemove = math.max(retainedStatements / 10, 1) val toRemove = math.max(retainedStatements / 10, 1)
executionList.take(toRemove).foreach { s => executionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s =>
executionList.remove(s._1) executionList.remove(s._1)
} }
} }
...@@ -226,7 +229,7 @@ object HiveThriftServer2 extends Logging { ...@@ -226,7 +229,7 @@ object HiveThriftServer2 extends Logging {
private def trimSessionIfNecessary() = synchronized { private def trimSessionIfNecessary() = synchronized {
if (sessionList.size > retainedSessions) { if (sessionList.size > retainedSessions) {
val toRemove = math.max(retainedSessions / 10, 1) val toRemove = math.max(retainedSessions / 10, 1)
sessionList.take(toRemove).foreach { s => sessionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s =>
sessionList.remove(s._1) sessionList.remove(s._1)
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment