diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
index b73041d306b362b3cbd60b4be4d60fa0f7149f26..59ef9042725451fab8c78ab8b9150cfaaaba1bde 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
@@ -68,66 +68,56 @@ case class HashOuterJoin(
   @transient private[this] lazy val DUMMY_LIST = Seq[Row](null)
   @transient private[this] lazy val EMPTY_LIST = Seq.empty[Row]
 
+  @transient private[this] lazy val leftNullRow = new GenericRow(left.output.length)
+  @transient private[this] lazy val rightNullRow = new GenericRow(right.output.length)
+  @transient private[this] lazy val boundCondition =
+    condition.map(newPredicate(_, left.output ++ right.output)).getOrElse((row: Row) => true)
+
   // TODO we need to rewrite all of the iterators with our own implementation instead of the Scala
   // iterator for performance purpose.
 
   private[this] def leftOuterIterator(
-      key: Row, leftIter: Iterable[Row], rightIter: Iterable[Row]): Iterator[Row] = {
-    val joinedRow = new JoinedRow()
-    val rightNullRow = new GenericRow(right.output.length)
-    val boundCondition =
-      condition.map(newPredicate(_, left.output ++ right.output)).getOrElse((row: Row) => true)
-
-    leftIter.iterator.flatMap { l =>
-      joinedRow.withLeft(l)
-      var matched = false
-      (if (!key.anyNull) rightIter.collect { case r if (boundCondition(joinedRow.withRight(r))) =>
-        matched = true
-        joinedRow.copy
+      key: Row, joinedRow: JoinedRow, rightIter: Iterable[Row]): Iterator[Row] = {
+    val ret: Iterable[Row] = (
+      if (!key.anyNull) {
+        val temp = rightIter.collect {
+          case r if (boundCondition(joinedRow.withRight(r))) => joinedRow.copy
+        }
+        if (temp.size  == 0) {
+          joinedRow.withRight(rightNullRow).copy :: Nil
+        } else {
+          temp
+        }
       } else {
-        Nil
-      }) ++ DUMMY_LIST.filter(_ => !matched).map( _ => {
-        // DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
-        // as we don't know whether we need to append it until finish iterating all of the
-        // records in right side.
-        // If we didn't get any proper row, then append a single row with empty right
-        joinedRow.withRight(rightNullRow).copy
-      })
-    }
+        joinedRow.withRight(rightNullRow).copy :: Nil
+      }
+    )
+    ret.iterator
   }
 
   private[this] def rightOuterIterator(
-      key: Row, leftIter: Iterable[Row], rightIter: Iterable[Row]): Iterator[Row] = {
-    val joinedRow = new JoinedRow()
-    val leftNullRow = new GenericRow(left.output.length)
-    val boundCondition =
-      condition.map(newPredicate(_, left.output ++ right.output)).getOrElse((row: Row) => true)
-
-    rightIter.iterator.flatMap { r =>
-      joinedRow.withRight(r)
-      var matched = false
-      (if (!key.anyNull) leftIter.collect { case l if (boundCondition(joinedRow.withLeft(l))) =>
-        matched = true
-        joinedRow.copy
+      key: Row, leftIter: Iterable[Row], joinedRow: JoinedRow): Iterator[Row] = {
+
+    val ret: Iterable[Row] = (
+      if (!key.anyNull) {
+        val temp = leftIter.collect {
+          case l if (boundCondition(joinedRow.withLeft(l))) => joinedRow.copy
+        }
+        if (temp.size  == 0) {
+          joinedRow.withLeft(leftNullRow).copy :: Nil
+        } else {
+          temp
+        }
       } else {
-        Nil
-      }) ++ DUMMY_LIST.filter(_ => !matched).map( _ => {
-        // DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
-        // as we don't know whether we need to append it until finish iterating all of the
-        // records in left side.
-        // If we didn't get any proper row, then append a single row with empty left.
-        joinedRow.withLeft(leftNullRow).copy
-      })
-    }
+        joinedRow.withLeft(leftNullRow).copy :: Nil
+      }
+    )
+    ret.iterator
   }
 
   private[this] def fullOuterIterator(
-      key: Row, leftIter: Iterable[Row], rightIter: Iterable[Row]): Iterator[Row] = {
-    val joinedRow = new JoinedRow()
-    val leftNullRow = new GenericRow(left.output.length)
-    val rightNullRow = new GenericRow(right.output.length)
-    val boundCondition =
-      condition.map(newPredicate(_, left.output ++ right.output)).getOrElse((row: Row) => true)
+      key: Row, leftIter: Iterable[Row], rightIter: Iterable[Row],
+      joinedRow: JoinedRow): Iterator[Row] = {
 
     if (!key.anyNull) {
       // Store the positions of records in right, if one of its associated row satisfy
@@ -193,27 +183,37 @@ case class HashOuterJoin(
   }
 
   override def execute() = {
+    val joinedRow = new JoinedRow()
     left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
       // TODO this probably can be replaced by external sort (sort merged join?)
-      // Build HashMap for current partition in left relation
-      val leftHashTable = buildHashTable(leftIter, newProjection(leftKeys, left.output))
-      // Build HashMap for current partition in right relation
-      val rightHashTable = buildHashTable(rightIter, newProjection(rightKeys, right.output))
-      val boundCondition =
-        condition.map(newPredicate(_, left.output ++ right.output)).getOrElse((row: Row) => true)
+
       joinType match {
-        case LeftOuter => leftHashTable.keysIterator.flatMap { key =>
-          leftOuterIterator(key, leftHashTable.getOrElse(key, EMPTY_LIST),
-            rightHashTable.getOrElse(key, EMPTY_LIST))
+        case LeftOuter => {
+          val rightHashTable = buildHashTable(rightIter, newProjection(rightKeys, right.output))
+          val keyGenerator = newProjection(leftKeys, left.output)
+          leftIter.flatMap( currentRow => {
+            val rowKey = keyGenerator(currentRow)
+            joinedRow.withLeft(currentRow)
+            leftOuterIterator(rowKey, joinedRow, rightHashTable.getOrElse(rowKey, EMPTY_LIST))
+          })
         }
-        case RightOuter => rightHashTable.keysIterator.flatMap { key =>
-          rightOuterIterator(key, leftHashTable.getOrElse(key, EMPTY_LIST),
-            rightHashTable.getOrElse(key, EMPTY_LIST))
+        case RightOuter => {
+          val leftHashTable = buildHashTable(leftIter, newProjection(leftKeys, left.output))
+          val keyGenerator = newProjection(rightKeys, right.output)
+          rightIter.flatMap ( currentRow => {
+            val rowKey = keyGenerator(currentRow)
+            joinedRow.withRight(currentRow)
+            rightOuterIterator(rowKey, leftHashTable.getOrElse(rowKey, EMPTY_LIST), joinedRow)
+          })
         }
-        case FullOuter => (leftHashTable.keySet ++ rightHashTable.keySet).iterator.flatMap { key =>
-          fullOuterIterator(key,
-            leftHashTable.getOrElse(key, EMPTY_LIST),
-            rightHashTable.getOrElse(key, EMPTY_LIST))
+        case FullOuter => {
+          val leftHashTable = buildHashTable(leftIter, newProjection(leftKeys, left.output))
+          val rightHashTable = buildHashTable(rightIter, newProjection(rightKeys, right.output))
+          (leftHashTable.keySet ++ rightHashTable.keySet).iterator.flatMap { key =>
+            fullOuterIterator(key,
+              leftHashTable.getOrElse(key, EMPTY_LIST),
+              rightHashTable.getOrElse(key, EMPTY_LIST), joinedRow)
+          }
         }
         case x => throw new Exception(s"HashOuterJoin should not take $x as the JoinType")
       }