Skip to content
Snippets Groups Projects
Unverified Commit a2c6adcc authored by Dongjoon Hyun's avatar Dongjoon Hyun Committed by Sean Owen
Browse files

[SPARK-18857][SQL] Don't use `Iterator.duplicate` for `incrementalCollect` in Thrift Server

## What changes were proposed in this pull request?

To support `FETCH_FIRST`, SPARK-16563 used Scala `Iterator.duplicate`. However,
Scala `Iterator.duplicate` uses a **queue to buffer all items between both iterators**,
this causes GC and hangs for queries with large number of rows. We should not use this,
especially for `spark.sql.thriftServer.incrementalCollect`.

https://github.com/scala/scala/blob/2.12.x/src/library/scala/collection/Iterator.scala#L1262-L1300

## How was this patch tested?

Pass the existing tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #16440 from dongjoon-hyun/SPARK-18857.
parent 2cfd41ac
No related branches found
No related tags found
No related merge requests found
...@@ -309,6 +309,13 @@ object SQLConf { ...@@ -309,6 +309,13 @@ object SQLConf {
.stringConf .stringConf
.createOptional .createOptional
val THRIFTSERVER_INCREMENTAL_COLLECT =
SQLConfigBuilder("spark.sql.thriftServer.incrementalCollect")
.internal()
.doc("When true, enable incremental collection for execution in Thrift Server.")
.booleanConf
.createWithDefault(false)
val THRIFTSERVER_UI_STATEMENT_LIMIT = val THRIFTSERVER_UI_STATEMENT_LIMIT =
SQLConfigBuilder("spark.sql.thriftserver.ui.retainedStatements") SQLConfigBuilder("spark.sql.thriftserver.ui.retainedStatements")
.doc("The number of SQL statements kept in the JDBC/ODBC web UI history.") .doc("The number of SQL statements kept in the JDBC/ODBC web UI history.")
......
...@@ -50,8 +50,13 @@ private[hive] class SparkExecuteStatementOperation( ...@@ -50,8 +50,13 @@ private[hive] class SparkExecuteStatementOperation(
with Logging { with Logging {
private var result: DataFrame = _ private var result: DataFrame = _
// We cache the returned rows to get iterators again in case the user wants to use FETCH_FIRST.
// This is only used when `spark.sql.thriftServer.incrementalCollect` is set to `false`.
// In case of `true`, this will be `None` and FETCH_FIRST will trigger re-execution.
private var resultList: Option[Array[SparkRow]] = _
private var iter: Iterator[SparkRow] = _ private var iter: Iterator[SparkRow] = _
private var iterHeader: Iterator[SparkRow] = _
private var dataTypes: Array[DataType] = _ private var dataTypes: Array[DataType] = _
private var statementId: String = _ private var statementId: String = _
...@@ -111,9 +116,15 @@ private[hive] class SparkExecuteStatementOperation( ...@@ -111,9 +116,15 @@ private[hive] class SparkExecuteStatementOperation(
// Reset iter to header when fetching start from first row // Reset iter to header when fetching start from first row
if (order.equals(FetchOrientation.FETCH_FIRST)) { if (order.equals(FetchOrientation.FETCH_FIRST)) {
val (ita, itb) = iterHeader.duplicate iter = if (sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) {
iter = ita resultList = None
iterHeader = itb result.toLocalIterator.asScala
} else {
if (resultList.isEmpty) {
resultList = Some(result.collect())
}
resultList.get.iterator
}
} }
if (!iter.hasNext) { if (!iter.hasNext) {
...@@ -227,17 +238,14 @@ private[hive] class SparkExecuteStatementOperation( ...@@ -227,17 +238,14 @@ private[hive] class SparkExecuteStatementOperation(
} }
HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString()) HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString())
iter = { iter = {
val useIncrementalCollect = if (sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) {
sqlContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean resultList = None
if (useIncrementalCollect) {
result.toLocalIterator.asScala result.toLocalIterator.asScala
} else { } else {
result.collect().iterator resultList = Some(result.collect())
resultList.get.iterator
} }
} }
val (itra, itrb) = iter.duplicate
iterHeader = itra
iter = itrb
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
} catch { } catch {
case e: HiveSQLException => case e: HiveSQLException =>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment