From 26c6ce3d2947df5a294b1ad4a22fae5d31d06c19 Mon Sep 17 00:00:00 2001
From: Michael Armbrust <michael@databricks.com>
Date: Tue, 24 Mar 2015 12:10:30 -0700
Subject: [PATCH] [SPARK-6437][SQL] Use completion iterator to close external
 sorter

Otherwise we will leak files when spilling occurs.

Author: Michael Armbrust <michael@databricks.com>

Closes #5161 from marmbrus/cleanupAfterSort and squashes the following commits:

cb13d3c [Michael Armbrust] hint to inferencer
cdebdf5 [Michael Armbrust] Use completion iterator to close external sorter
---
 .../org/apache/spark/sql/execution/basicOperators.scala     | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 20c9bc3e75..1f5251a203 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.util.MutablePair
+import org.apache.spark.util.{CompletionIterator, MutablePair}
 import org.apache.spark.util.collection.ExternalSorter
 
 /**
@@ -194,7 +194,9 @@ case class ExternalSort(
       val ordering = newOrdering(sortOrder, child.output)
       val sorter = new ExternalSorter[Row, Null, Row](ordering = Some(ordering))
       sorter.insertAll(iterator.map(r => (r, null)))
-      sorter.iterator.map(_._1)
+      val baseIterator = sorter.iterator.map(_._1)
+      // TODO(marmbrus): The complex type signature below thwarts inference for no reason.
+      CompletionIterator[Row, Iterator[Row]](baseIterator, sorter.stop())
     }, preservesPartitioning = true)
   }
 
-- 
GitLab