From 95296d9b1ad1d9e9396d7dfd0015ef27ce1cf341 Mon Sep 17 00:00:00 2001
From: Nong <nong@cloudera.com>
Date: Fri, 4 Dec 2015 10:01:20 -0800
Subject: [PATCH] [SPARK-12089] [SQL] Fix memory corrupt due to freeing a page
 being referenced

When the spillable sort iterator was spilled, it was mistakenly keeping
the last page in memory rather than the current page. This causes the
current record to get corrupted.

Author: Nong <nong@cloudera.com>

Closes #10142 from nongli/spark-12089.
---
 .../util/collection/unsafe/sort/UnsafeExternalSorter.java  | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 5a97f4f113..79d74b23ce 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -443,6 +443,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
         UnsafeInMemorySorter.SortedIterator inMemIterator =
           ((UnsafeInMemorySorter.SortedIterator) upstream).clone();
 
+        // Iterate over the records that have not been returned and spill them.
         final UnsafeSorterSpillWriter spillWriter =
           new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, numRecords);
         while (inMemIterator.hasNext()) {
@@ -458,9 +459,11 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
 
         long released = 0L;
         synchronized (UnsafeExternalSorter.this) {
-          // release the pages except the one that is used
+          // release the pages except the one that is used. There can still be a caller that
+          // is accessing the current record. We free this page in that caller's next loadNext()
+          // call.
           for (MemoryBlock page : allocatedPages) {
-            if (!loaded || page.getBaseObject() != inMemIterator.getBaseObject()) {
+            if (!loaded || page.getBaseObject() != upstream.getBaseObject()) {
               released += page.size();
               freePage(page);
             } else {
-- 
GitLab