Skip to content
Snippets Groups Projects
Commit e945aa61 authored by Kay Ousterhout's avatar Kay Ousterhout Committed by Cheng Lian
Browse files

[SPARK-5846] Correctly set job description and pool for SQL jobs

marmbrus am I missing something obvious here? I verified that this fixes the problem for me (on 1.2.1) on EC2, but I'm confused about how others wouldn't have noticed this?

Author: Kay Ousterhout <kayousterhout@gmail.com>

Closes #4630 from kayousterhout/SPARK-5846_1.3 and squashes the following commits:

2022ad4 [Kay Ousterhout] [SPARK-5846] Correctly set job description and pool for SQL jobs
parent d12d2ad7
No related branches found
No related tags found
No related merge requests found
...@@ -185,6 +185,10 @@ private[hive] class SparkExecuteStatementOperation( ...@@ -185,6 +185,10 @@ private[hive] class SparkExecuteStatementOperation(
def run(): Unit = { def run(): Unit = {
logInfo(s"Running query '$statement'") logInfo(s"Running query '$statement'")
setState(OperationState.RUNNING) setState(OperationState.RUNNING)
hiveContext.sparkContext.setJobDescription(statement)
sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
try { try {
result = hiveContext.sql(statement) result = hiveContext.sql(statement)
logDebug(result.queryExecution.toString()) logDebug(result.queryExecution.toString())
...@@ -194,10 +198,6 @@ private[hive] class SparkExecuteStatementOperation( ...@@ -194,10 +198,6 @@ private[hive] class SparkExecuteStatementOperation(
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.") logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
case _ => case _ =>
} }
hiveContext.sparkContext.setJobDescription(statement)
sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
iter = { iter = {
val useIncrementalCollect = val useIncrementalCollect =
hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
......
...@@ -156,6 +156,10 @@ private[hive] class SparkExecuteStatementOperation( ...@@ -156,6 +156,10 @@ private[hive] class SparkExecuteStatementOperation(
def run(): Unit = { def run(): Unit = {
logInfo(s"Running query '$statement'") logInfo(s"Running query '$statement'")
setState(OperationState.RUNNING) setState(OperationState.RUNNING)
hiveContext.sparkContext.setJobDescription(statement)
sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
try { try {
result = hiveContext.sql(statement) result = hiveContext.sql(statement)
logDebug(result.queryExecution.toString()) logDebug(result.queryExecution.toString())
...@@ -165,10 +169,6 @@ private[hive] class SparkExecuteStatementOperation( ...@@ -165,10 +169,6 @@ private[hive] class SparkExecuteStatementOperation(
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.") logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
case _ => case _ =>
} }
hiveContext.sparkContext.setJobDescription(statement)
sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
iter = { iter = {
val useIncrementalCollect = val useIncrementalCollect =
hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment