Skip to content
Snippets Groups Projects
Commit 6bca8898 authored by Patrick Wendell's avatar Patrick Wendell Committed by Michael Armbrust
Browse files

SPARK-3025 [SQL]: Allow JDBC clients to set a fair scheduler pool

This definitely needs review as I am not familiar with this part of Spark.
I tested this locally and it did seem to work.

Author: Patrick Wendell <pwendell@gmail.com>

Closes #1937 from pwendell/scheduler and squashes the following commits:

b858e33 [Patrick Wendell] SPARK-3025: Allow JDBC clients to set a fair scheduler pool
parent 4bf3de71
No related branches found
No related tags found
No related merge requests found
......@@ -605,6 +605,11 @@ Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`.
You may also use the beeline script comes with Hive.
To set a [Fair Scheduler](job-scheduling.html#fair-scheduler-pools) pool for a JDBC client session,
users can set the `spark.sql.thriftserver.scheduler.pool` variable:
SET spark.sql.thriftserver.scheduler.pool=accounting;
### Migration Guide for Shark Users
#### Reducer number
......
......@@ -33,6 +33,9 @@ private[spark] object SQLConf {
val DIALECT = "spark.sql.dialect"
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
// This is only used for the thriftserver
val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
......
......@@ -17,24 +17,24 @@
package org.apache.spark.sql.hive.thriftserver.server
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.math.{random, round}
import java.sql.Timestamp
import java.util.{Map => JMap}
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, Map}
import scala.math.{random, round}
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager}
import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.Logging
import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD}
import org.apache.spark.sql.catalyst.plans.logical.SetCommand
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
import org.apache.spark.sql.{SchemaRDD, Row => SparkRow}
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils
/**
* Executes queries using Spark SQL, and maintains a list of handles to active queries.
......@@ -43,6 +43,9 @@ class SparkSQLOperationManager(hiveContext: HiveContext) extends OperationManage
val handleToOperation = ReflectionUtils
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
// TODO: Currenlty this will grow infinitely, even as sessions expire
val sessionToActivePool = Map[HiveSession, String]()
override def newExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
......@@ -165,8 +168,18 @@ class SparkSQLOperationManager(hiveContext: HiveContext) extends OperationManage
try {
result = hiveContext.sql(statement)
logDebug(result.queryExecution.toString())
result.queryExecution.logical match {
case SetCommand(Some(key), Some(value)) if (key == SQLConf.THRIFTSERVER_POOL) =>
sessionToActivePool(parentSession) = value
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
case _ =>
}
val groupId = round(random * 1000000).toString
hiveContext.sparkContext.setJobGroup(groupId, statement)
sessionToActivePool.get(parentSession).foreach { pool =>
hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
iter = {
val resultRdd = result.queryExecution.toRdd
val useIncrementalCollect =
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment