Skip to content
Snippets Groups Projects
Commit 4b69a42e authored by Liang-Chi Hsieh's avatar Liang-Chi Hsieh Committed by Davies Liu
Browse files

[SPARK-11362] [SQL] Use Spark BitSet in BroadcastNestedLoopJoin

JIRA: https://issues.apache.org/jira/browse/SPARK-11362

We use scala.collection.mutable.BitSet in BroadcastNestedLoopJoin now. We should use Spark's BitSet.

Author: Liang-Chi Hsieh <viirya@appier.com>

Closes #9316 from viirya/use-spark-bitset.
parent ef362846
No related branches found
No related tags found
No related merge requests found
......@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.collection.CompactBuffer
import org.apache.spark.util.collection.{BitSet, CompactBuffer}
case class BroadcastNestedLoopJoin(
......@@ -95,9 +95,7 @@ case class BroadcastNestedLoopJoin(
/** All rows that either match both-way, or rows from streamed joined with nulls. */
val matchesOrStreamedRowsWithNulls = streamed.execute().mapPartitions { streamedIter =>
val matchedRows = new CompactBuffer[InternalRow]
// TODO: Use Spark's BitSet.
val includedBroadcastTuples =
new scala.collection.mutable.BitSet(broadcastedRelation.value.size)
val includedBroadcastTuples = new BitSet(broadcastedRelation.value.size)
val joinedRow = new JoinedRow
val leftNulls = new GenericMutableRow(left.output.size)
......@@ -115,11 +113,11 @@ case class BroadcastNestedLoopJoin(
case BuildRight if boundCondition(joinedRow(streamedRow, broadcastedRow)) =>
matchedRows += resultProj(joinedRow(streamedRow, broadcastedRow)).copy()
streamRowMatched = true
includedBroadcastTuples += i
includedBroadcastTuples.set(i)
case BuildLeft if boundCondition(joinedRow(broadcastedRow, streamedRow)) =>
matchedRows += resultProj(joinedRow(broadcastedRow, streamedRow)).copy()
streamRowMatched = true
includedBroadcastTuples += i
includedBroadcastTuples.set(i)
case _ =>
}
i += 1
......@@ -138,8 +136,8 @@ case class BroadcastNestedLoopJoin(
val includedBroadcastTuples = matchesOrStreamedRowsWithNulls.map(_._2)
val allIncludedBroadcastTuples = includedBroadcastTuples.fold(
new scala.collection.mutable.BitSet(broadcastedRelation.value.size)
)(_ ++ _)
new BitSet(broadcastedRelation.value.size)
)(_ | _)
val leftNulls = new GenericMutableRow(left.output.size)
val rightNulls = new GenericMutableRow(right.output.size)
......@@ -155,7 +153,7 @@ case class BroadcastNestedLoopJoin(
val joinedRow = new JoinedRow
joinedRow.withLeft(leftNulls)
while (i < rel.length) {
if (!allIncludedBroadcastTuples.contains(i)) {
if (!allIncludedBroadcastTuples.get(i)) {
buf += resultProj(joinedRow.withRight(rel(i))).copy()
}
i += 1
......@@ -164,7 +162,7 @@ case class BroadcastNestedLoopJoin(
val joinedRow = new JoinedRow
joinedRow.withRight(rightNulls)
while (i < rel.length) {
if (!allIncludedBroadcastTuples.contains(i)) {
if (!allIncludedBroadcastTuples.get(i)) {
buf += resultProj(joinedRow.withLeft(rel(i))).copy()
}
i += 1
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment