From 9256840cb631cad50852b2b218a1ac71b567084a Mon Sep 17 00:00:00 2001
From: Davies Liu <davies@databricks.com>
Date: Mon, 14 Mar 2016 22:25:57 -0700
Subject: [PATCH] [SPARK-13661][SQL] avoid the copy in HashedRelation

## What changes were proposed in this pull request?

Avoid the copy in HashedRelation, since most of the HashedRelation are built with Array[Row], added the copy() for LeftSemiJoinHash. This could help to reduce the memory consumption for Broadcast join.

## How was this patch tested?

Existing tests.

Author: Davies Liu <davies@databricks.com>

Closes #11666 from davies/remove_copy.
---
 .../spark/sql/execution/joins/HashedRelation.scala    | 11 ++++++++---
 .../spark/sql/execution/joins/LeftSemiJoinHash.scala  |  2 +-
 2 files changed, 9 insertions(+), 4 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 6235897ed1..0b0f59c3e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -156,6 +156,11 @@ private[joins] class UniqueKeyHashedRelation(
 
 private[execution] object HashedRelation {
 
+  /**
+   * Create a HashedRelation from an Iterator of InternalRow.
+   *
+   * Note: The caller should make sure that these InternalRow are different objects.
+   */
   def apply(
       input: Iterator[InternalRow],
       keyGenerator: Projection,
@@ -188,7 +193,7 @@ private[execution] object HashedRelation {
           keyIsUnique = false
           existingMatchList
         }
-        matchList += currentRow.copy()
+        matchList += currentRow
       }
     }
 
@@ -438,7 +443,7 @@ private[joins] object UnsafeHashedRelation {
         } else {
           existingMatchList
         }
-        matchList += unsafeRow.copy()
+        matchList += unsafeRow
       }
     }
 
@@ -622,7 +627,7 @@ private[joins] object LongHashedRelation {
           keyIsUnique = false
           existingMatchList
         }
-        matchList += unsafeRow.copy()
+        matchList += unsafeRow
       }
     }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
index 242ed61232..14389e45ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
@@ -47,7 +47,7 @@ case class LeftSemiJoinHash(
     val numOutputRows = longMetric("numOutputRows")
 
     right.execute().zipPartitions(left.execute()) { (buildIter, streamIter) =>
-      val hashRelation = HashedRelation(buildIter, rightKeyGenerator)
+      val hashRelation = HashedRelation(buildIter.map(_.copy()), rightKeyGenerator)
       hashSemiJoin(streamIter, hashRelation, numOutputRows)
     }
   }
-- 
GitLab