Skip to content
Snippets Groups Projects
Commit a45133b8 authored by Wenchen Fan's avatar Wenchen Fan Committed by gatorsmile
Browse files

[SPARK-21743][SQL] top-most limit should not cause memory leak

## What changes were proposed in this pull request?

For top-most limit, we will use a special operator to execute it: `CollectLimitExec`.

`CollectLimitExec` will retrieve `n`(which is the limit) rows from each partition of the child plan output, see https://github.com/apache/spark/blob/v2.2.0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L311. It's very likely that we don't exhaust the child plan output.

This is fine when whole-stage-codegen is off, as child plan will release the resource via task completion listener. However, when whole-stage codegen is on, the resource can only be released if all output is consumed.

To fix this memory leak, one simple approach is, when `CollectLimitExec` retrieve `n` rows from child plan output, child plan output should only have `n` rows, then the output is exhausted and resource is released. This can be done by wrapping child plan with `LocalLimit`

## How was this patch tested?

a regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #18955 from cloud-fan/leak.
parent b8ffb510
No related branches found
No related tags found
No related merge requests found
......@@ -72,7 +72,12 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.TakeOrderedAndProjectExec(
limit, order, projectList, planLater(child)) :: Nil
case logical.Limit(IntegerLiteral(limit), child) =>
execution.CollectLimitExec(limit, planLater(child)) :: Nil
// Normally wrapping child with `LocalLimitExec` here is a no-op, because
// `CollectLimitExec.executeCollect` will call `LocalLimitExec.executeTake`, which
// calls `child.executeTake`. If child supports whole stage codegen, adding this
// `LocalLimitExec` can stop the processing of whole stage codegen and trigger the
// resource releasing work, after we consume `limit` rows.
execution.CollectLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil
case other => planLater(other) :: Nil
}
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
......
......@@ -54,6 +54,14 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
val limit: Int
override def output: Seq[Attribute] = child.output
// Do not enable whole stage codegen for a single limit.
override def supportCodegen: Boolean = child match {
case plan: CodegenSupport => plan.supportCodegen
case _ => false
}
override def executeTake(n: Int): Array[InternalRow] = child.executeTake(math.min(n, limit))
protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
iter.take(limit)
}
......
......@@ -2658,4 +2658,9 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
checkAnswer(sql("SELECT __auto_generated_subquery_name.i from (SELECT i FROM v)"), Row(1))
}
}
test("SPARK-21743: top-most limit should not cause memory leak") {
// In unit test, Spark will fail the query if memory leak detected.
spark.range(100).groupBy("id").count().limit(1).collect()
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment