Skip to content
Snippets Groups Projects
Commit 30f6b85c authored by tianyi's avatar tianyi Committed by Michael Armbrust
Browse files

[SPARK-4483][SQL]Optimization about reduce memory costs during the HashOuterJoin

In `HashOuterJoin.scala`, spark read data from both side of join operation before zip them together. It is a waste for memory. We are trying to read data from only one side, put them into a hashmap, and then generate the `JoinedRow` with data from other side one by one.
Currently, we could only do this optimization for `left outer join` and `right outer join`. For `full outer join`, we will do something in another issue.

for
table test_csv contains 1 million records
table dim_csv contains 10 thousand records

SQL:
`select * from test_csv a left outer join dim_csv b on a.key = b.key`

the result is:
master:
```
CSV: 12671 ms
CSV: 9021 ms
CSV: 9200 ms
Current Mem Usage:787788984
```
after patch:
```
CSV: 10382 ms
CSV: 7543 ms
CSV: 7469 ms
Current Mem Usage:208145728
```

Author: tianyi <tianyi@asiainfo-linkage.com>
Author: tianyi <tianyi.asiainfo@gmail.com>

Closes #3375 from tianyi/SPARK-4483 and squashes the following commits:

72a8aec [tianyi] avoid having mutable state stored inside of the task
99c5c97 [tianyi] performance optimization
d2f94d7 [tianyi] fix bug: missing output when the join-key is null.
2be45d1 [tianyi] fix spell bug
1f2c6f1 [tianyi] remove commented codes
a676de6 [tianyi] optimize some codes
9e7d5b5 [tianyi] remove commented old codes
838707d [tianyi] Optimization about reduce memory costs during the HashOuterJoin
parent ea1315e3
No related branches found
No related tags found
No related merge requests found
...@@ -68,66 +68,56 @@ case class HashOuterJoin( ...@@ -68,66 +68,56 @@ case class HashOuterJoin(
@transient private[this] lazy val DUMMY_LIST = Seq[Row](null) @transient private[this] lazy val DUMMY_LIST = Seq[Row](null)
@transient private[this] lazy val EMPTY_LIST = Seq.empty[Row] @transient private[this] lazy val EMPTY_LIST = Seq.empty[Row]
@transient private[this] lazy val leftNullRow = new GenericRow(left.output.length)
@transient private[this] lazy val rightNullRow = new GenericRow(right.output.length)
@transient private[this] lazy val boundCondition =
condition.map(newPredicate(_, left.output ++ right.output)).getOrElse((row: Row) => true)
// TODO we need to rewrite all of the iterators with our own implementation instead of the Scala // TODO we need to rewrite all of the iterators with our own implementation instead of the Scala
// iterator for performance purpose. // iterator for performance purpose.
private[this] def leftOuterIterator( private[this] def leftOuterIterator(
key: Row, leftIter: Iterable[Row], rightIter: Iterable[Row]): Iterator[Row] = { key: Row, joinedRow: JoinedRow, rightIter: Iterable[Row]): Iterator[Row] = {
val joinedRow = new JoinedRow() val ret: Iterable[Row] = (
val rightNullRow = new GenericRow(right.output.length) if (!key.anyNull) {
val boundCondition = val temp = rightIter.collect {
condition.map(newPredicate(_, left.output ++ right.output)).getOrElse((row: Row) => true) case r if (boundCondition(joinedRow.withRight(r))) => joinedRow.copy
}
leftIter.iterator.flatMap { l => if (temp.size == 0) {
joinedRow.withLeft(l) joinedRow.withRight(rightNullRow).copy :: Nil
var matched = false } else {
(if (!key.anyNull) rightIter.collect { case r if (boundCondition(joinedRow.withRight(r))) => temp
matched = true }
joinedRow.copy
} else { } else {
Nil joinedRow.withRight(rightNullRow).copy :: Nil
}) ++ DUMMY_LIST.filter(_ => !matched).map( _ => { }
// DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row, )
// as we don't know whether we need to append it until finish iterating all of the ret.iterator
// records in right side.
// If we didn't get any proper row, then append a single row with empty right
joinedRow.withRight(rightNullRow).copy
})
}
} }
private[this] def rightOuterIterator( private[this] def rightOuterIterator(
key: Row, leftIter: Iterable[Row], rightIter: Iterable[Row]): Iterator[Row] = { key: Row, leftIter: Iterable[Row], joinedRow: JoinedRow): Iterator[Row] = {
val joinedRow = new JoinedRow()
val leftNullRow = new GenericRow(left.output.length) val ret: Iterable[Row] = (
val boundCondition = if (!key.anyNull) {
condition.map(newPredicate(_, left.output ++ right.output)).getOrElse((row: Row) => true) val temp = leftIter.collect {
case l if (boundCondition(joinedRow.withLeft(l))) => joinedRow.copy
rightIter.iterator.flatMap { r => }
joinedRow.withRight(r) if (temp.size == 0) {
var matched = false joinedRow.withLeft(leftNullRow).copy :: Nil
(if (!key.anyNull) leftIter.collect { case l if (boundCondition(joinedRow.withLeft(l))) => } else {
matched = true temp
joinedRow.copy }
} else { } else {
Nil joinedRow.withLeft(leftNullRow).copy :: Nil
}) ++ DUMMY_LIST.filter(_ => !matched).map( _ => { }
// DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row, )
// as we don't know whether we need to append it until finish iterating all of the ret.iterator
// records in left side.
// If we didn't get any proper row, then append a single row with empty left.
joinedRow.withLeft(leftNullRow).copy
})
}
} }
private[this] def fullOuterIterator( private[this] def fullOuterIterator(
key: Row, leftIter: Iterable[Row], rightIter: Iterable[Row]): Iterator[Row] = { key: Row, leftIter: Iterable[Row], rightIter: Iterable[Row],
val joinedRow = new JoinedRow() joinedRow: JoinedRow): Iterator[Row] = {
val leftNullRow = new GenericRow(left.output.length)
val rightNullRow = new GenericRow(right.output.length)
val boundCondition =
condition.map(newPredicate(_, left.output ++ right.output)).getOrElse((row: Row) => true)
if (!key.anyNull) { if (!key.anyNull) {
// Store the positions of records in right, if one of its associated row satisfy // Store the positions of records in right, if one of its associated row satisfy
...@@ -193,27 +183,37 @@ case class HashOuterJoin( ...@@ -193,27 +183,37 @@ case class HashOuterJoin(
} }
override def execute() = { override def execute() = {
val joinedRow = new JoinedRow()
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) => left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
// TODO this probably can be replaced by external sort (sort merged join?) // TODO this probably can be replaced by external sort (sort merged join?)
// Build HashMap for current partition in left relation
val leftHashTable = buildHashTable(leftIter, newProjection(leftKeys, left.output))
// Build HashMap for current partition in right relation
val rightHashTable = buildHashTable(rightIter, newProjection(rightKeys, right.output))
val boundCondition =
condition.map(newPredicate(_, left.output ++ right.output)).getOrElse((row: Row) => true)
joinType match { joinType match {
case LeftOuter => leftHashTable.keysIterator.flatMap { key => case LeftOuter => {
leftOuterIterator(key, leftHashTable.getOrElse(key, EMPTY_LIST), val rightHashTable = buildHashTable(rightIter, newProjection(rightKeys, right.output))
rightHashTable.getOrElse(key, EMPTY_LIST)) val keyGenerator = newProjection(leftKeys, left.output)
leftIter.flatMap( currentRow => {
val rowKey = keyGenerator(currentRow)
joinedRow.withLeft(currentRow)
leftOuterIterator(rowKey, joinedRow, rightHashTable.getOrElse(rowKey, EMPTY_LIST))
})
} }
case RightOuter => rightHashTable.keysIterator.flatMap { key => case RightOuter => {
rightOuterIterator(key, leftHashTable.getOrElse(key, EMPTY_LIST), val leftHashTable = buildHashTable(leftIter, newProjection(leftKeys, left.output))
rightHashTable.getOrElse(key, EMPTY_LIST)) val keyGenerator = newProjection(rightKeys, right.output)
rightIter.flatMap ( currentRow => {
val rowKey = keyGenerator(currentRow)
joinedRow.withRight(currentRow)
rightOuterIterator(rowKey, leftHashTable.getOrElse(rowKey, EMPTY_LIST), joinedRow)
})
} }
case FullOuter => (leftHashTable.keySet ++ rightHashTable.keySet).iterator.flatMap { key => case FullOuter => {
fullOuterIterator(key, val leftHashTable = buildHashTable(leftIter, newProjection(leftKeys, left.output))
leftHashTable.getOrElse(key, EMPTY_LIST), val rightHashTable = buildHashTable(rightIter, newProjection(rightKeys, right.output))
rightHashTable.getOrElse(key, EMPTY_LIST)) (leftHashTable.keySet ++ rightHashTable.keySet).iterator.flatMap { key =>
fullOuterIterator(key,
leftHashTable.getOrElse(key, EMPTY_LIST),
rightHashTable.getOrElse(key, EMPTY_LIST), joinedRow)
}
} }
case x => throw new Exception(s"HashOuterJoin should not take $x as the JoinType") case x => throw new Exception(s"HashOuterJoin should not take $x as the JoinType")
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment