diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 20c9bc3e7554226c923b0d43f2626b20460774cf..1f5251a20376f94e11fad78f5466b8d2eeb63fe4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.util.MutablePair
+import org.apache.spark.util.{CompletionIterator, MutablePair}
 import org.apache.spark.util.collection.ExternalSorter
 
 /**
@@ -194,7 +194,9 @@ case class ExternalSort(
       val ordering = newOrdering(sortOrder, child.output)
       val sorter = new ExternalSorter[Row, Null, Row](ordering = Some(ordering))
       sorter.insertAll(iterator.map(r => (r, null)))
-      sorter.iterator.map(_._1)
+      val baseIterator = sorter.iterator.map(_._1)
+      // TODO(marmbrus): The complex type signature below thwarts inference for no reason.
+      CompletionIterator[Row, Iterator[Row]](baseIterator, sorter.stop())
     }, preservesPartitioning = true)
   }