Skip to content
Snippets Groups Projects
Commit 4bf46097 authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-13882][SQL] Remove org.apache.spark.sql.execution.local

## What changes were proposed in this pull request?
We introduced some local operators in org.apache.spark.sql.execution.local package but never fully wired the engine to actually use these. We still plan to implement a full local mode, but it's probably going to be fairly different from what the current iterator-based local mode would look like. Based on what we know right now, we might want a push-based columnar version of these operators.

Let's just remove them for now, and we can always re-introduced them in the future by looking at branch-1.6.

## How was this patch tested?
This is simply dead code removal.

Author: Reynold Xin <rxin@databricks.com>

Closes #11705 from rxin/SPARK-13882.
parent 17eec0a7
No related branches found
No related tags found
No related merge requests found
Showing
with 0 additions and 1334 deletions
......@@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
import org.apache.spark.sql.execution.SparkSqlSerializer
import org.apache.spark.sql.execution.local.LocalNode
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.map.BytesToBytesMap
import org.apache.spark.util.{KnownSizeEstimation, SizeEstimator, Utils}
......@@ -157,10 +156,6 @@ private[joins] class UniqueKeyHashedRelation(
private[execution] object HashedRelation {
def apply(localNode: LocalNode, keyGenerator: Projection): HashedRelation = {
apply(localNode.asIterator, keyGenerator)
}
def apply(
input: Iterator[InternalRow],
keyGenerator: Projection,
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.local
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, HashedRelation}
import org.apache.spark.sql.internal.SQLConf
/**
* A [[HashJoinNode]] that builds the [[HashedRelation]] according to the value of
* `buildSide`. The actual work of this node is defined in [[HashJoinNode]].
*/
case class BinaryHashJoinNode(
conf: SQLConf,
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
buildSide: BuildSide,
left: LocalNode,
right: LocalNode)
extends BinaryLocalNode(conf) with HashJoinNode {
protected override val (streamedNode, streamedKeys) = buildSide match {
case BuildLeft => (right, rightKeys)
case BuildRight => (left, leftKeys)
}
private val (buildNode, buildKeys) = buildSide match {
case BuildLeft => (left, leftKeys)
case BuildRight => (right, rightKeys)
}
override def output: Seq[Attribute] = left.output ++ right.output
private def buildSideKeyGenerator: Projection = {
// We are expecting the data types of buildKeys and streamedKeys are the same.
assert(buildKeys.map(_.dataType) == streamedKeys.map(_.dataType))
UnsafeProjection.create(buildKeys, buildNode.output)
}
protected override def doOpen(): Unit = {
buildNode.open()
val hashedRelation = HashedRelation(buildNode, buildSideKeyGenerator)
// We have built the HashedRelation. So, close buildNode.
buildNode.close()
streamedNode.open()
// Set the HashedRelation used by the HashJoinNode.
withHashedRelation(hashedRelation)
}
override def close(): Unit = {
// Please note that we do not need to call the close method of our buildNode because
// it has been called in this.open.
streamedNode.close()
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.local
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, HashedRelation}
import org.apache.spark.sql.internal.SQLConf
/**
* A [[HashJoinNode]] for broadcast join. It takes a streamedNode and a broadcast
* [[HashedRelation]]. The actual work of this node is defined in [[HashJoinNode]].
*/
case class BroadcastHashJoinNode(
conf: SQLConf,
streamedKeys: Seq[Expression],
streamedNode: LocalNode,
buildSide: BuildSide,
buildOutput: Seq[Attribute],
hashedRelation: Broadcast[HashedRelation])
extends UnaryLocalNode(conf) with HashJoinNode {
override val child = streamedNode
// Because we do not pass in the buildNode, we take the output of buildNode to
// create the inputSet properly.
override def inputSet: AttributeSet = AttributeSet(child.output ++ buildOutput)
override def output: Seq[Attribute] = buildSide match {
case BuildRight => streamedNode.output ++ buildOutput
case BuildLeft => buildOutput ++ streamedNode.output
}
protected override def doOpen(): Unit = {
streamedNode.open()
// Set the HashedRelation used by the HashJoinNode.
withHashedRelation(hashedRelation.value)
}
override def close(): Unit = {
streamedNode.close()
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.local
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, FromUnsafeProjection, Projection}
import org.apache.spark.sql.internal.SQLConf
case class ConvertToSafeNode(conf: SQLConf, child: LocalNode) extends UnaryLocalNode(conf) {
override def output: Seq[Attribute] = child.output
private[this] var convertToSafe: Projection = _
override def open(): Unit = {
child.open()
convertToSafe = FromUnsafeProjection(child.schema)
}
override def next(): Boolean = child.next()
override def fetch(): InternalRow = convertToSafe(child.fetch())
override def close(): Unit = child.close()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.local
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Projection, UnsafeProjection}
import org.apache.spark.sql.internal.SQLConf
case class ConvertToUnsafeNode(conf: SQLConf, child: LocalNode) extends UnaryLocalNode(conf) {
override def output: Seq[Attribute] = child.output
private[this] var convertToUnsafe: Projection = _
override def open(): Unit = {
child.open()
convertToUnsafe = UnsafeProjection.create(child.schema)
}
override def next(): Boolean = child.next()
override def fetch(): InternalRow = convertToUnsafe(child.fetch())
override def close(): Unit = child.close()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.local
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.internal.SQLConf
case class ExpandNode(
conf: SQLConf,
projections: Seq[Seq[Expression]],
output: Seq[Attribute],
child: LocalNode) extends UnaryLocalNode(conf) {
assert(projections.size > 0)
private[this] var result: InternalRow = _
private[this] var idx: Int = _
private[this] var input: InternalRow = _
private[this] var groups: Array[Projection] = _
override def open(): Unit = {
child.open()
groups = projections.map(ee => newMutableProjection(ee, child.output)()).toArray
idx = groups.length
}
override def next(): Boolean = {
if (idx >= groups.length) {
if (child.next()) {
input = child.fetch()
idx = 0
} else {
return false
}
}
result = groups(idx)(input)
idx += 1
true
}
override def fetch(): InternalRow = result
override def close(): Unit = child.close()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.local
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.internal.SQLConf
case class FilterNode(conf: SQLConf, condition: Expression, child: LocalNode)
extends UnaryLocalNode(conf) {
private[this] var predicate: (InternalRow) => Boolean = _
override def output: Seq[Attribute] = child.output
override def open(): Unit = {
child.open()
predicate = GeneratePredicate.generate(condition, child.output)
}
override def next(): Boolean = {
var found = false
while (!found && child.next()) {
found = predicate.apply(child.fetch())
}
found
}
override def fetch(): InternalRow = child.fetch()
override def close(): Unit = child.close()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.local
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.joins._
/**
* An abstract node for sharing common functionality among different implementations of
* inner hash equi-join, notably [[BinaryHashJoinNode]] and [[BroadcastHashJoinNode]].
*
* Much of this code is similar to [[org.apache.spark.sql.execution.joins.HashJoin]].
*/
trait HashJoinNode {
self: LocalNode =>
protected def streamedKeys: Seq[Expression]
protected def streamedNode: LocalNode
protected def buildSide: BuildSide
private[this] var currentStreamedRow: InternalRow = _
private[this] var currentHashMatches: Seq[InternalRow] = _
private[this] var currentMatchPosition: Int = -1
private[this] var joinRow: JoinedRow = _
private[this] var resultProjection: (InternalRow) => InternalRow = _
private[this] var hashed: HashedRelation = _
private[this] var joinKeys: Projection = _
private def streamSideKeyGenerator: Projection =
UnsafeProjection.create(streamedKeys, streamedNode.output)
/**
* Sets the HashedRelation used by this node. This method needs to be called after
* before the first `next` gets called.
*/
protected def withHashedRelation(hashedRelation: HashedRelation): Unit = {
hashed = hashedRelation
}
/**
* Custom open implementation to be overridden by subclasses.
*/
protected def doOpen(): Unit
override def open(): Unit = {
doOpen()
joinRow = new JoinedRow
resultProjection = UnsafeProjection.create(schema)
joinKeys = streamSideKeyGenerator
}
override def next(): Boolean = {
currentMatchPosition += 1
if (currentHashMatches == null || currentMatchPosition >= currentHashMatches.size) {
fetchNextMatch()
} else {
true
}
}
/**
* Populate `currentHashMatches` with build-side rows matching the next streamed row.
* @return whether matches are found such that subsequent calls to `fetch` are valid.
*/
private def fetchNextMatch(): Boolean = {
currentHashMatches = null
currentMatchPosition = -1
while (currentHashMatches == null && streamedNode.next()) {
currentStreamedRow = streamedNode.fetch()
val key = joinKeys(currentStreamedRow)
if (!key.anyNull) {
currentHashMatches = hashed.get(key)
}
}
if (currentHashMatches == null) {
false
} else {
currentMatchPosition = 0
true
}
}
override def fetch(): InternalRow = {
val ret = buildSide match {
case BuildRight => joinRow(currentStreamedRow, currentHashMatches(currentMatchPosition))
case BuildLeft => joinRow(currentHashMatches(currentMatchPosition), currentStreamedRow)
}
resultProjection(ret)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.local
import scala.collection.mutable
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.internal.SQLConf
case class IntersectNode(conf: SQLConf, left: LocalNode, right: LocalNode)
extends BinaryLocalNode(conf) {
override def output: Seq[Attribute] = left.output
private[this] var leftRows: mutable.HashSet[InternalRow] = _
private[this] var currentRow: InternalRow = _
override def open(): Unit = {
left.open()
leftRows = mutable.HashSet[InternalRow]()
while (left.next()) {
leftRows += left.fetch().copy()
}
left.close()
right.open()
}
override def next(): Boolean = {
currentRow = null
while (currentRow == null && right.next()) {
currentRow = right.fetch()
if (!leftRows.contains(currentRow)) {
currentRow = null
}
}
currentRow != null
}
override def fetch(): InternalRow = currentRow
override def close(): Unit = {
left.close()
right.close()
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.local
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.internal.SQLConf
case class LimitNode(conf: SQLConf, limit: Int, child: LocalNode) extends UnaryLocalNode(conf) {
private[this] var count = 0
override def output: Seq[Attribute] = child.output
override def open(): Unit = child.open()
override def close(): Unit = child.close()
override def fetch(): InternalRow = child.fetch()
override def next(): Boolean = {
if (count < limit) {
count += 1
child.next()
} else {
false
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.local
import org.apache.spark.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
/**
* A local physical operator, in the form of an iterator.
*
* Before consuming the iterator, open function must be called.
* After consuming the iterator, close function must be called.
*/
abstract class LocalNode(conf: SQLConf) extends QueryPlan[LocalNode] with Logging {
private[this] lazy val isTesting: Boolean = sys.props.contains("spark.testing")
/**
* Called before open(). Prepare can be used to reserve memory needed. It must NOT consume
* any input data.
*
* Implementations of this must also call the `prepare()` function of its children.
*/
def prepare(): Unit = children.foreach(_.prepare())
/**
* Initializes the iterator state. Must be called before calling `next()`.
*
* Implementations of this must also call the `open()` function of its children.
*/
def open(): Unit
/**
* Advances the iterator to the next tuple. Returns true if there is at least one more tuple.
*/
def next(): Boolean
/**
* Returns the current tuple.
*/
def fetch(): InternalRow
/**
* Closes the iterator and releases all resources. It should be idempotent.
*
* Implementations of this must also call the `close()` function of its children.
*/
def close(): Unit
/**
* Returns the content through the [[Iterator]] interface.
*/
final def asIterator: Iterator[InternalRow] = new LocalNodeIterator(this)
/**
* Returns the content of the iterator from the beginning to the end in the form of a Scala Seq.
*/
final def collect(): Seq[Row] = {
val converter = CatalystTypeConverters.createToScalaConverter(StructType.fromAttributes(output))
val result = new scala.collection.mutable.ArrayBuffer[Row]
open()
try {
while (next()) {
result += converter.apply(fetch()).asInstanceOf[Row]
}
} finally {
close()
}
result
}
protected def newMutableProjection(
expressions: Seq[Expression],
inputSchema: Seq[Attribute]): () => MutableProjection = {
log.debug(
s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
GenerateMutableProjection.generate(expressions, inputSchema)
}
protected def newPredicate(
expression: Expression,
inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
GeneratePredicate.generate(expression, inputSchema)
}
}
abstract class LeafLocalNode(conf: SQLConf) extends LocalNode(conf) {
override def children: Seq[LocalNode] = Seq.empty
}
abstract class UnaryLocalNode(conf: SQLConf) extends LocalNode(conf) {
def child: LocalNode
override def children: Seq[LocalNode] = Seq(child)
}
abstract class BinaryLocalNode(conf: SQLConf) extends LocalNode(conf) {
def left: LocalNode
def right: LocalNode
override def children: Seq[LocalNode] = Seq(left, right)
}
/**
* An thin wrapper around a [[LocalNode]] that provides an `Iterator` interface.
*/
private class LocalNodeIterator(localNode: LocalNode) extends Iterator[InternalRow] {
private var nextRow: InternalRow = _
override def hasNext: Boolean = {
if (nextRow == null) {
val res = localNode.next()
if (res) {
nextRow = localNode.fetch()
}
res
} else {
true
}
}
override def next(): InternalRow = {
if (hasNext) {
val res = nextRow
nextRow = null
res
} else {
throw new NoSuchElementException
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.local
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.collection.{BitSet, CompactBuffer}
case class NestedLoopJoinNode(
conf: SQLConf,
left: LocalNode,
right: LocalNode,
buildSide: BuildSide,
joinType: JoinType,
condition: Option[Expression]) extends BinaryLocalNode(conf) {
override def output: Seq[Attribute] = {
joinType match {
case LeftOuter =>
left.output ++ right.output.map(_.withNullability(true))
case RightOuter =>
left.output.map(_.withNullability(true)) ++ right.output
case FullOuter =>
left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
case x =>
throw new IllegalArgumentException(
s"NestedLoopJoin should not take $x as the JoinType")
}
}
private[this] def genResultProjection: InternalRow => InternalRow = {
UnsafeProjection.create(schema)
}
private[this] var currentRow: InternalRow = _
private[this] var iterator: Iterator[InternalRow] = _
override def open(): Unit = {
val (streamed, build) = buildSide match {
case BuildRight => (left, right)
case BuildLeft => (right, left)
}
build.open()
val buildRelation = new CompactBuffer[InternalRow]
while (build.next()) {
buildRelation += build.fetch().copy()
}
build.close()
val boundCondition =
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
val leftNulls = new GenericMutableRow(left.output.size)
val rightNulls = new GenericMutableRow(right.output.size)
val joinedRow = new JoinedRow
val matchedBuildTuples = new BitSet(buildRelation.size)
val resultProj = genResultProjection
streamed.open()
// streamedRowMatches also contains null rows if using outer join
val streamedRowMatches: Iterator[InternalRow] = streamed.asIterator.flatMap { streamedRow =>
val matchedRows = new CompactBuffer[InternalRow]
var i = 0
var streamRowMatched = false
// Scan the build relation to look for matches for each streamed row
while (i < buildRelation.size) {
val buildRow = buildRelation(i)
buildSide match {
case BuildRight => joinedRow(streamedRow, buildRow)
case BuildLeft => joinedRow(buildRow, streamedRow)
}
if (boundCondition(joinedRow)) {
matchedRows += resultProj(joinedRow).copy()
streamRowMatched = true
matchedBuildTuples.set(i)
}
i += 1
}
// If this row had no matches and we're using outer join, join it with the null rows
if (!streamRowMatched) {
(joinType, buildSide) match {
case (LeftOuter | FullOuter, BuildRight) =>
matchedRows += resultProj(joinedRow(streamedRow, rightNulls)).copy()
case (RightOuter | FullOuter, BuildLeft) =>
matchedRows += resultProj(joinedRow(leftNulls, streamedRow)).copy()
case _ =>
}
}
matchedRows.iterator
}
// If we're using outer join, find rows on the build side that didn't match anything
// and join them with the null row
lazy val unmatchedBuildRows: Iterator[InternalRow] = {
var i = 0
buildRelation.filter { row =>
val r = !matchedBuildTuples.get(i)
i += 1
r
}.iterator
}
iterator = (joinType, buildSide) match {
case (RightOuter | FullOuter, BuildRight) =>
streamedRowMatches ++
unmatchedBuildRows.map { buildRow => resultProj(joinedRow(leftNulls, buildRow)) }
case (LeftOuter | FullOuter, BuildLeft) =>
streamedRowMatches ++
unmatchedBuildRows.map { buildRow => resultProj(joinedRow(buildRow, rightNulls)) }
case _ => streamedRowMatches
}
}
override def next(): Boolean = {
if (iterator.hasNext) {
currentRow = iterator.next()
true
} else {
false
}
}
override def fetch(): InternalRow = currentRow
override def close(): Unit = {
left.close()
right.close()
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.local
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, UnsafeProjection}
import org.apache.spark.sql.internal.SQLConf
case class ProjectNode(conf: SQLConf, projectList: Seq[NamedExpression], child: LocalNode)
extends UnaryLocalNode(conf) {
private[this] var project: UnsafeProjection = _
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
override def open(): Unit = {
project = UnsafeProjection.create(projectList, child.output)
child.open()
}
override def next(): Boolean = child.next()
override def fetch(): InternalRow = {
project.apply(child.fetch())
}
override def close(): Unit = child.close()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.local
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}
/**
* Sample the dataset.
*
* @param conf the SQLConf
* @param lowerBound Lower-bound of the sampling probability (usually 0.0)
* @param upperBound Upper-bound of the sampling probability. The expected fraction sampled
* will be ub - lb.
* @param withReplacement Whether to sample with replacement.
* @param seed the random seed
* @param child the LocalNode
*/
case class SampleNode(
conf: SQLConf,
lowerBound: Double,
upperBound: Double,
withReplacement: Boolean,
seed: Long,
child: LocalNode) extends UnaryLocalNode(conf) {
override def output: Seq[Attribute] = child.output
private[this] var iterator: Iterator[InternalRow] = _
private[this] var currentRow: InternalRow = _
override def open(): Unit = {
child.open()
val sampler =
if (withReplacement) {
// Disable gap sampling since the gap sampling method buffers two rows internally,
// requiring us to copy the row, which is more expensive than the random number generator.
new PoissonSampler[InternalRow](upperBound - lowerBound, useGapSamplingIfPossible = false)
} else {
new BernoulliCellSampler[InternalRow](lowerBound, upperBound)
}
sampler.setSeed(seed)
iterator = sampler.sample(child.asIterator)
}
override def next(): Boolean = {
if (iterator.hasNext) {
currentRow = iterator.next()
true
} else {
false
}
}
override def fetch(): InternalRow = currentRow
override def close(): Unit = child.close()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.local
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.internal.SQLConf
/**
* An operator that scans some local data collection in the form of Scala Seq.
*/
case class SeqScanNode(conf: SQLConf, output: Seq[Attribute], data: Seq[InternalRow])
extends LeafLocalNode(conf) {
private[this] var iterator: Iterator[InternalRow] = _
private[this] var currentRow: InternalRow = _
override def open(): Unit = {
iterator = data.iterator
}
override def next(): Boolean = {
if (iterator.hasNext) {
currentRow = iterator.next()
true
} else {
false
}
}
override def fetch(): InternalRow = currentRow
override def close(): Unit = {
// Do nothing
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.local
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.BoundedPriorityQueue
case class TakeOrderedAndProjectNode(
conf: SQLConf,
limit: Int,
sortOrder: Seq[SortOrder],
projectList: Option[Seq[NamedExpression]],
child: LocalNode) extends UnaryLocalNode(conf) {
private[this] var projection: Option[Projection] = _
private[this] var ord: Ordering[InternalRow] = _
private[this] var iterator: Iterator[InternalRow] = _
private[this] var currentRow: InternalRow = _
override def output: Seq[Attribute] = {
val projectOutput = projectList.map(_.map(_.toAttribute))
projectOutput.getOrElse(child.output)
}
override def open(): Unit = {
child.open()
projection = projectList.map(UnsafeProjection.create(_, child.output))
ord = GenerateOrdering.generate(sortOrder, child.output)
// Priority keeps the largest elements, so let's reverse the ordering.
val queue = new BoundedPriorityQueue[InternalRow](limit)(ord.reverse)
while (child.next()) {
queue += child.fetch()
}
// Close it eagerly since we don't need it.
child.close()
iterator = queue.toArray.sorted(ord).iterator
}
override def next(): Boolean = {
if (iterator.hasNext) {
val _currentRow = iterator.next()
currentRow = projection match {
case Some(p) => p(_currentRow)
case None => _currentRow
}
true
} else {
false
}
}
override def fetch(): InternalRow = currentRow
override def close(): Unit = child.close()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.local
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.internal.SQLConf
case class UnionNode(conf: SQLConf, children: Seq[LocalNode]) extends LocalNode(conf) {
override def output: Seq[Attribute] = children.head.output
private[this] var currentChild: LocalNode = _
private[this] var nextChildIndex: Int = _
override def open(): Unit = {
currentChild = children.head
currentChild.open()
nextChildIndex = 1
}
private def advanceToNextChild(): Boolean = {
var found = false
var exit = false
while (!exit && !found) {
if (currentChild != null) {
currentChild.close()
}
if (nextChildIndex >= children.size) {
found = false
exit = true
} else {
currentChild = children(nextChildIndex)
nextChildIndex += 1
currentChild.open()
found = currentChild.next()
}
}
found
}
override def close(): Unit = {
if (currentChild != null) {
currentChild.close()
}
}
override def fetch(): InternalRow = currentChild.fetch()
override def next(): Boolean = {
if (currentChild.next()) {
true
} else {
advanceToNextChild()
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.local
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.internal.SQLConf
/**
* A dummy [[LocalNode]] that just returns rows from a [[LocalRelation]].
*/
private[local] case class DummyNode(
output: Seq[Attribute],
relation: LocalRelation,
conf: SQLConf)
extends LocalNode(conf) {
import DummyNode._
private var index: Int = CLOSED
private val input: Seq[InternalRow] = relation.data
def this(output: Seq[Attribute], data: Seq[Product], conf: SQLConf = new SQLConf) {
this(output, LocalRelation.fromProduct(output, data), conf)
}
def isOpen: Boolean = index != CLOSED
override def children: Seq[LocalNode] = Seq.empty
override def open(): Unit = {
index = -1
}
override def next(): Boolean = {
index += 1
index < input.size
}
override def fetch(): InternalRow = {
assert(index >= 0 && index < input.size)
input(index)
}
override def close(): Unit = {
index = CLOSED
}
}
private object DummyNode {
val CLOSED: Int = Int.MinValue
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.local
import org.apache.spark.sql.catalyst.dsl.expressions._
class ExpandNodeSuite extends LocalNodeTest {
private def testExpand(inputData: Array[(Int, Int)] = Array.empty): Unit = {
val inputNode = new DummyNode(kvIntAttributes, inputData)
val projections = Seq(Seq('k + 'v, 'k - 'v), Seq('k * 'v, 'k / 'v))
val expandNode = new ExpandNode(conf, projections, inputNode.output, inputNode)
val resolvedNode = resolveExpressions(expandNode)
val expectedOutput = {
val firstHalf = inputData.map { case (k, v) => (k + v, k - v) }
val secondHalf = inputData.map { case (k, v) => (k * v, k / v) }
firstHalf ++ secondHalf
}
val actualOutput = resolvedNode.collect().map { case row =>
(row.getInt(0), row.getInt(1))
}
assert(actualOutput.toSet === expectedOutput.toSet)
}
test("empty") {
testExpand()
}
test("basic") {
testExpand((1 to 100).map { i => (i, i * 1000) }.toArray)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.local
import org.apache.spark.sql.catalyst.dsl.expressions._
class FilterNodeSuite extends LocalNodeTest {
private def testFilter(inputData: Array[(Int, Int)] = Array.empty): Unit = {
val cond = 'k % 2 === 0
val inputNode = new DummyNode(kvIntAttributes, inputData)
val filterNode = new FilterNode(conf, cond, inputNode)
val resolvedNode = resolveExpressions(filterNode)
val expectedOutput = inputData.filter { case (k, _) => k % 2 == 0 }
val actualOutput = resolvedNode.collect().map { case row =>
(row.getInt(0), row.getInt(1))
}
assert(actualOutput === expectedOutput)
}
test("empty") {
testFilter()
}
test("basic") {
testFilter((1 to 100).map { i => (i, i) }.toArray)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment