diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 99f8841c8737bd0d3cbfb895ea2b16788bbe1e2b..6235897ed179856abd9608c46ae9e55568509771 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
 import org.apache.spark.sql.execution.SparkSqlSerializer
-import org.apache.spark.sql.execution.local.LocalNode
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.unsafe.map.BytesToBytesMap
 import org.apache.spark.util.{KnownSizeEstimation, SizeEstimator, Utils}
@@ -157,10 +156,6 @@ private[joins] class UniqueKeyHashedRelation(
 private[execution] object HashedRelation {
-  def apply(localNode: LocalNode, keyGenerator: Projection): HashedRelation = {
-    apply(localNode.asIterator, keyGenerator)
-  }
   def apply(
       input: Iterator[InternalRow],
       keyGenerator: Projection,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/BinaryHashJoinNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/BinaryHashJoinNode.scala
deleted file mode 100644
index 97f93580169408f0337d49306dd1199558320ac3..0000000000000000000000000000000000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/BinaryHashJoinNode.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*    http://www.apache.org/licenses/LICENSE-2.0
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql.execution.local
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, HashedRelation}
-import org.apache.spark.sql.internal.SQLConf
- * A [[HashJoinNode]] that builds the [[HashedRelation]] according to the value of
- * `buildSide`. The actual work of this node is defined in [[HashJoinNode]].
- */
-case class BinaryHashJoinNode(
-    conf: SQLConf,
-    leftKeys: Seq[Expression],
-    rightKeys: Seq[Expression],
-    buildSide: BuildSide,
-    left: LocalNode,
-    right: LocalNode)
-  extends BinaryLocalNode(conf) with HashJoinNode {
-  protected override val (streamedNode, streamedKeys) = buildSide match {
-    case BuildLeft => (right, rightKeys)
-    case BuildRight => (left, leftKeys)
-  }
-  private val (buildNode, buildKeys) = buildSide match {
-    case BuildLeft => (left, leftKeys)
-    case BuildRight => (right, rightKeys)
-  }
-  override def output: Seq[Attribute] = left.output ++ right.output
-  private def buildSideKeyGenerator: Projection = {
-    // We are expecting the data types of buildKeys and streamedKeys are the same.
-    assert(buildKeys.map(_.dataType) == streamedKeys.map(_.dataType))
-    UnsafeProjection.create(buildKeys, buildNode.output)
-  }
-  protected override def doOpen(): Unit = {
-    buildNode.open()
-    val hashedRelation = HashedRelation(buildNode, buildSideKeyGenerator)
-    // We have built the HashedRelation. So, close buildNode.
-    buildNode.close()
-    streamedNode.open()
-    // Set the HashedRelation used by the HashJoinNode.
-    withHashedRelation(hashedRelation)
-  }
-  override def close(): Unit = {
-    // Please note that we do not need to call the close method of our buildNode because
-    // it has been called in this.open.
-    streamedNode.close()
-  }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/BroadcastHashJoinNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/BroadcastHashJoinNode.scala
deleted file mode 100644
index 779f4833fa417c6544690a47504f47228b01d0c9..0000000000000000000000000000000000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/BroadcastHashJoinNode.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*    http://www.apache.org/licenses/LICENSE-2.0
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql.execution.local
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, HashedRelation}
-import org.apache.spark.sql.internal.SQLConf
- * A [[HashJoinNode]] for broadcast join. It takes a streamedNode and a broadcast
- * [[HashedRelation]]. The actual work of this node is defined in [[HashJoinNode]].
- */
-case class BroadcastHashJoinNode(
-    conf: SQLConf,
-    streamedKeys: Seq[Expression],
-    streamedNode: LocalNode,
-    buildSide: BuildSide,
-    buildOutput: Seq[Attribute],
-    hashedRelation: Broadcast[HashedRelation])
-  extends UnaryLocalNode(conf) with HashJoinNode {
-  override val child = streamedNode
-  // Because we do not pass in the buildNode, we take the output of buildNode to
-  // create the inputSet properly.
-  override def inputSet: AttributeSet = AttributeSet(child.output ++ buildOutput)
-  override def output: Seq[Attribute] = buildSide match {
-    case BuildRight => streamedNode.output ++ buildOutput
-    case BuildLeft => buildOutput ++ streamedNode.output
-  }
-  protected override def doOpen(): Unit = {
-    streamedNode.open()
-    // Set the HashedRelation used by the HashJoinNode.
-    withHashedRelation(hashedRelation.value)
-  }
-  override def close(): Unit = {
-    streamedNode.close()
-  }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ConvertToSafeNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ConvertToSafeNode.scala
deleted file mode 100644
index f79d795a904d1b148f766f6c57792ef86258a760..0000000000000000000000000000000000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ConvertToSafeNode.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*    http://www.apache.org/licenses/LICENSE-2.0
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql.execution.local
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, FromUnsafeProjection, Projection}
-import org.apache.spark.sql.internal.SQLConf
-case class ConvertToSafeNode(conf: SQLConf, child: LocalNode) extends UnaryLocalNode(conf) {
-  override def output: Seq[Attribute] = child.output
-  private[this] var convertToSafe: Projection = _
-  override def open(): Unit = {
-    child.open()
-    convertToSafe = FromUnsafeProjection(child.schema)
-  }
-  override def next(): Boolean = child.next()
-  override def fetch(): InternalRow = convertToSafe(child.fetch())
-  override def close(): Unit = child.close()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ConvertToUnsafeNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ConvertToUnsafeNode.scala
deleted file mode 100644
index f3fa474b0f7ffe8261cc729f9829320f6c6c1ac6..0000000000000000000000000000000000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ConvertToUnsafeNode.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*    http://www.apache.org/licenses/LICENSE-2.0
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql.execution.local
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Projection, UnsafeProjection}
-import org.apache.spark.sql.internal.SQLConf
-case class ConvertToUnsafeNode(conf: SQLConf, child: LocalNode) extends UnaryLocalNode(conf) {
-  override def output: Seq[Attribute] = child.output
-  private[this] var convertToUnsafe: Projection = _
-  override def open(): Unit = {
-    child.open()
-    convertToUnsafe = UnsafeProjection.create(child.schema)
-  }
-  override def next(): Boolean = child.next()
-  override def fetch(): InternalRow = convertToUnsafe(child.fetch())
-  override def close(): Unit = child.close()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala
deleted file mode 100644
index 6ccd6db0e6ca419307e1faaa83e1a530fb7df370..0000000000000000000000000000000000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*    http://www.apache.org/licenses/LICENSE-2.0
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql.execution.local
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.internal.SQLConf
-case class ExpandNode(
-    conf: SQLConf,
-    projections: Seq[Seq[Expression]],
-    output: Seq[Attribute],
-    child: LocalNode) extends UnaryLocalNode(conf) {
-  assert(projections.size > 0)
-  private[this] var result: InternalRow = _
-  private[this] var idx: Int = _
-  private[this] var input: InternalRow = _
-  private[this] var groups: Array[Projection] = _
-  override def open(): Unit = {
-    child.open()
-    groups = projections.map(ee => newMutableProjection(ee, child.output)()).toArray
-    idx = groups.length
-  }
-  override def next(): Boolean = {
-    if (idx >= groups.length) {
-      if (child.next()) {
-        input = child.fetch()
-        idx = 0
-      } else {
-        return false
-      }
-    }
-    result = groups(idx)(input)
-    idx += 1
-    true
-  }
-  override def fetch(): InternalRow = result
-  override def close(): Unit = child.close()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala
deleted file mode 100644
index c5eb33cef44202d1d135ab0d34f2a691250e36f5..0000000000000000000000000000000000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*    http://www.apache.org/licenses/LICENSE-2.0
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql.execution.local
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
-import org.apache.spark.sql.internal.SQLConf
-case class FilterNode(conf: SQLConf, condition: Expression, child: LocalNode)
-  extends UnaryLocalNode(conf) {
-  private[this] var predicate: (InternalRow) => Boolean = _
-  override def output: Seq[Attribute] = child.output
-  override def open(): Unit = {
-    child.open()
-    predicate = GeneratePredicate.generate(condition, child.output)
-  }
-  override def next(): Boolean = {
-    var found = false
-    while (!found && child.next()) {
-      found = predicate.apply(child.fetch())
-    }
-    found
-  }
-  override def fetch(): InternalRow = child.fetch()
-  override def close(): Unit = child.close()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/HashJoinNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/HashJoinNode.scala
deleted file mode 100644
index fd7948ffa9a9b6af0fe7661a2641dc1474837b3e..0000000000000000000000000000000000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/HashJoinNode.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*    http://www.apache.org/licenses/LICENSE-2.0
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql.execution.local
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.joins._
- * An abstract node for sharing common functionality among different implementations of
- * inner hash equi-join, notably [[BinaryHashJoinNode]] and [[BroadcastHashJoinNode]].
- *
- * Much of this code is similar to [[org.apache.spark.sql.execution.joins.HashJoin]].
- */
-trait HashJoinNode {
-  self: LocalNode =>
-  protected def streamedKeys: Seq[Expression]
-  protected def streamedNode: LocalNode
-  protected def buildSide: BuildSide
-  private[this] var currentStreamedRow: InternalRow = _
-  private[this] var currentHashMatches: Seq[InternalRow] = _
-  private[this] var currentMatchPosition: Int = -1
-  private[this] var joinRow: JoinedRow = _
-  private[this] var resultProjection: (InternalRow) => InternalRow = _
-  private[this] var hashed: HashedRelation = _
-  private[this] var joinKeys: Projection = _
-  private def streamSideKeyGenerator: Projection =
-    UnsafeProjection.create(streamedKeys, streamedNode.output)
-  /**
-   * Sets the HashedRelation used by this node. This method needs to be called after
-   * before the first `next` gets called.
-   */
-  protected def withHashedRelation(hashedRelation: HashedRelation): Unit = {
-    hashed = hashedRelation
-  }
-  /**
-   * Custom open implementation to be overridden by subclasses.
-   */
-  protected def doOpen(): Unit
-  override def open(): Unit = {
-    doOpen()
-    joinRow = new JoinedRow
-    resultProjection = UnsafeProjection.create(schema)
-    joinKeys = streamSideKeyGenerator
-  }
-  override def next(): Boolean = {
-    currentMatchPosition += 1
-    if (currentHashMatches == null || currentMatchPosition >= currentHashMatches.size) {
-      fetchNextMatch()
-    } else {
-      true
-    }
-  }
-  /**
-   * Populate `currentHashMatches` with build-side rows matching the next streamed row.
-   * @return whether matches are found such that subsequent calls to `fetch` are valid.
-   */
-  private def fetchNextMatch(): Boolean = {
-    currentHashMatches = null
-    currentMatchPosition = -1
-    while (currentHashMatches == null && streamedNode.next()) {
-      currentStreamedRow = streamedNode.fetch()
-      val key = joinKeys(currentStreamedRow)
-      if (!key.anyNull) {
-        currentHashMatches = hashed.get(key)
-      }
-    }
-    if (currentHashMatches == null) {
-      false
-    } else {
-      currentMatchPosition = 0
-      true
-    }
-  }
-  override def fetch(): InternalRow = {
-    val ret = buildSide match {
-      case BuildRight => joinRow(currentStreamedRow, currentHashMatches(currentMatchPosition))
-      case BuildLeft => joinRow(currentHashMatches(currentMatchPosition), currentStreamedRow)
-    }
-    resultProjection(ret)
-  }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/IntersectNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/IntersectNode.scala
deleted file mode 100644
index e594e132dea799f04d6087269389a7defa79e6ed..0000000000000000000000000000000000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/IntersectNode.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*    http://www.apache.org/licenses/LICENSE-2.0
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql.execution.local
-import scala.collection.mutable
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.internal.SQLConf
-case class IntersectNode(conf: SQLConf, left: LocalNode, right: LocalNode)
-  extends BinaryLocalNode(conf) {
-  override def output: Seq[Attribute] = left.output
-  private[this] var leftRows: mutable.HashSet[InternalRow] = _
-  private[this] var currentRow: InternalRow = _
-  override def open(): Unit = {
-    left.open()
-    leftRows = mutable.HashSet[InternalRow]()
-    while (left.next()) {
-      leftRows += left.fetch().copy()
-    }
-    left.close()
-    right.open()
-  }
-  override def next(): Boolean = {
-    currentRow = null
-    while (currentRow == null && right.next()) {
-      currentRow = right.fetch()
-      if (!leftRows.contains(currentRow)) {
-        currentRow = null
-      }
-    }
-    currentRow != null
-  }
-  override def fetch(): InternalRow = currentRow
-  override def close(): Unit = {
-    left.close()
-    right.close()
-  }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LimitNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LimitNode.scala
deleted file mode 100644
index 9af45ac0aac9a5f3ec61845c04ea6d0c53d31a8b..0000000000000000000000000000000000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LimitNode.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*    http://www.apache.org/licenses/LICENSE-2.0
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql.execution.local
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.internal.SQLConf
-case class LimitNode(conf: SQLConf, limit: Int, child: LocalNode) extends UnaryLocalNode(conf) {
-  private[this] var count = 0
-  override def output: Seq[Attribute] = child.output
-  override def open(): Unit = child.open()
-  override def close(): Unit = child.close()
-  override def fetch(): InternalRow = child.fetch()
-  override def next(): Boolean = {
-    if (count < limit) {
-      count += 1
-      child.next()
-    } else {
-      false
-    }
-  }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
deleted file mode 100644
index a5d09691dc46c93fe659c34e5c7c057a46f9448c..0000000000000000000000000000000000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*    http://www.apache.org/licenses/LICENSE-2.0
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql.execution.local
-import org.apache.spark.Logging
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.StructType
- * A local physical operator, in the form of an iterator.
- *
- * Before consuming the iterator, open function must be called.
- * After consuming the iterator, close function must be called.
- */
-abstract class LocalNode(conf: SQLConf) extends QueryPlan[LocalNode] with Logging {
-  private[this] lazy val isTesting: Boolean = sys.props.contains("spark.testing")
-  /**
-   * Called before open(). Prepare can be used to reserve memory needed. It must NOT consume
-   * any input data.
-   *
-   * Implementations of this must also call the `prepare()` function of its children.
-   */
-  def prepare(): Unit = children.foreach(_.prepare())
-  /**
-   * Initializes the iterator state. Must be called before calling `next()`.
-   *
-   * Implementations of this must also call the `open()` function of its children.
-   */
-  def open(): Unit
-  /**
-   * Advances the iterator to the next tuple. Returns true if there is at least one more tuple.
-   */
-  def next(): Boolean
-  /**
-   * Returns the current tuple.
-   */
-  def fetch(): InternalRow
-  /**
-   * Closes the iterator and releases all resources. It should be idempotent.
-   *
-   * Implementations of this must also call the `close()` function of its children.
-   */
-  def close(): Unit
-  /**
-   * Returns the content through the [[Iterator]] interface.
-   */
-  final def asIterator: Iterator[InternalRow] = new LocalNodeIterator(this)
-  /**
-   * Returns the content of the iterator from the beginning to the end in the form of a Scala Seq.
-   */
-  final def collect(): Seq[Row] = {
-    val converter = CatalystTypeConverters.createToScalaConverter(StructType.fromAttributes(output))
-    val result = new scala.collection.mutable.ArrayBuffer[Row]
-    open()
-    try {
-      while (next()) {
-        result += converter.apply(fetch()).asInstanceOf[Row]
-      }
-    } finally {
-      close()
-    }
-    result
-  }
-  protected def newMutableProjection(
-      expressions: Seq[Expression],
-      inputSchema: Seq[Attribute]): () => MutableProjection = {
-    log.debug(
-      s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
-    GenerateMutableProjection.generate(expressions, inputSchema)
-  }
-  protected def newPredicate(
-      expression: Expression,
-      inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
-    GeneratePredicate.generate(expression, inputSchema)
-  }
-abstract class LeafLocalNode(conf: SQLConf) extends LocalNode(conf) {
-  override def children: Seq[LocalNode] = Seq.empty
-abstract class UnaryLocalNode(conf: SQLConf) extends LocalNode(conf) {
-  def child: LocalNode
-  override def children: Seq[LocalNode] = Seq(child)
-abstract class BinaryLocalNode(conf: SQLConf) extends LocalNode(conf) {
-  def left: LocalNode
-  def right: LocalNode
-  override def children: Seq[LocalNode] = Seq(left, right)
- * An thin wrapper around a [[LocalNode]] that provides an `Iterator` interface.
- */
-private class LocalNodeIterator(localNode: LocalNode) extends Iterator[InternalRow] {
-  private var nextRow: InternalRow = _
-  override def hasNext: Boolean = {
-    if (nextRow == null) {
-      val res = localNode.next()
-      if (res) {
-        nextRow = localNode.fetch()
-      }
-      res
-    } else {
-      true
-    }
-  }
-  override def next(): InternalRow = {
-    if (hasNext) {
-      val res = nextRow
-      nextRow = null
-      res
-    } else {
-      throw new NoSuchElementException
-    }
-  }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
deleted file mode 100644
index b5ea08325c58e0750f1c59f8641ad5459a180cb5..0000000000000000000000000000000000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
+++ /dev/null
@@ -1,152 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.local
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter}
-import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.collection.{BitSet, CompactBuffer}
-case class NestedLoopJoinNode(
-    conf: SQLConf,
-    left: LocalNode,
-    right: LocalNode,
-    buildSide: BuildSide,
-    joinType: JoinType,
-    condition: Option[Expression]) extends BinaryLocalNode(conf) {
-  override def output: Seq[Attribute] = {
-    joinType match {
-      case LeftOuter =>
-        left.output ++ right.output.map(_.withNullability(true))
-      case RightOuter =>
-        left.output.map(_.withNullability(true)) ++ right.output
-      case FullOuter =>
-        left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
-      case x =>
-        throw new IllegalArgumentException(
-          s"NestedLoopJoin should not take $x as the JoinType")
-    }
-  }
-  private[this] def genResultProjection: InternalRow => InternalRow = {
-    UnsafeProjection.create(schema)
-  }
-  private[this] var currentRow: InternalRow = _
-  private[this] var iterator: Iterator[InternalRow] = _
-  override def open(): Unit = {
-    val (streamed, build) = buildSide match {
-      case BuildRight => (left, right)
-      case BuildLeft => (right, left)
-    }
-    build.open()
-    val buildRelation = new CompactBuffer[InternalRow]
-    while (build.next()) {
-      buildRelation += build.fetch().copy()
-    }
-    build.close()
-    val boundCondition =
-      newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
-    val leftNulls = new GenericMutableRow(left.output.size)
-    val rightNulls = new GenericMutableRow(right.output.size)
-    val joinedRow = new JoinedRow
-    val matchedBuildTuples = new BitSet(buildRelation.size)
-    val resultProj = genResultProjection
-    streamed.open()
-    // streamedRowMatches also contains null rows if using outer join
-    val streamedRowMatches: Iterator[InternalRow] = streamed.asIterator.flatMap { streamedRow =>
-      val matchedRows = new CompactBuffer[InternalRow]
-      var i = 0
-      var streamRowMatched = false
-      // Scan the build relation to look for matches for each streamed row
-      while (i < buildRelation.size) {
-        val buildRow = buildRelation(i)
-        buildSide match {
-          case BuildRight => joinedRow(streamedRow, buildRow)
-          case BuildLeft => joinedRow(buildRow, streamedRow)
-        }
-        if (boundCondition(joinedRow)) {
-          matchedRows += resultProj(joinedRow).copy()
-          streamRowMatched = true
-          matchedBuildTuples.set(i)
-        }
-        i += 1
-      }
-      // If this row had no matches and we're using outer join, join it with the null rows
-      if (!streamRowMatched) {
-        (joinType, buildSide) match {
-          case (LeftOuter | FullOuter, BuildRight) =>
-            matchedRows += resultProj(joinedRow(streamedRow, rightNulls)).copy()
-          case (RightOuter | FullOuter, BuildLeft) =>
-            matchedRows += resultProj(joinedRow(leftNulls, streamedRow)).copy()
-          case _ =>
-        }
-      }
-      matchedRows.iterator
-    }
-    // If we're using outer join, find rows on the build side that didn't match anything
-    // and join them with the null row
-    lazy val unmatchedBuildRows: Iterator[InternalRow] = {
-      var i = 0
-      buildRelation.filter { row =>
-        val r = !matchedBuildTuples.get(i)
-        i += 1
-        r
-      }.iterator
-    }
-    iterator = (joinType, buildSide) match {
-      case (RightOuter | FullOuter, BuildRight) =>
-        streamedRowMatches ++
-          unmatchedBuildRows.map { buildRow => resultProj(joinedRow(leftNulls, buildRow)) }
-      case (LeftOuter | FullOuter, BuildLeft) =>
-        streamedRowMatches ++
-          unmatchedBuildRows.map { buildRow => resultProj(joinedRow(buildRow, rightNulls)) }
-      case _ => streamedRowMatches
-    }
-  }
-  override def next(): Boolean = {
-    if (iterator.hasNext) {
-      currentRow = iterator.next()
-      true
-    } else {
-      false
-    }
-  }
-  override def fetch(): InternalRow = currentRow
-  override def close(): Unit = {
-    left.close()
-    right.close()
-  }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala
deleted file mode 100644
index 5fe068a13c8a478d6f106e78f0c8e91cde71e519..0000000000000000000000000000000000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*    http://www.apache.org/licenses/LICENSE-2.0
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql.execution.local
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, UnsafeProjection}
-import org.apache.spark.sql.internal.SQLConf
-case class ProjectNode(conf: SQLConf, projectList: Seq[NamedExpression], child: LocalNode)
-  extends UnaryLocalNode(conf) {
-  private[this] var project: UnsafeProjection = _
-  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
-  override def open(): Unit = {
-    project = UnsafeProjection.create(projectList, child.output)
-    child.open()
-  }
-  override def next(): Boolean = child.next()
-  override def fetch(): InternalRow = {
-    project.apply(child.fetch())
-  }
-  override def close(): Unit = child.close()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SampleNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SampleNode.scala
deleted file mode 100644
index 078fb50deb16fd4d2e6c75b5fa04d289115cb1c3..0000000000000000000000000000000000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SampleNode.scala
+++ /dev/null
@@ -1,78 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.local
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}
- * Sample the dataset.
- *
- * @param conf the SQLConf
- * @param lowerBound Lower-bound of the sampling probability (usually 0.0)
- * @param upperBound Upper-bound of the sampling probability. The expected fraction sampled
- *                   will be ub - lb.
- * @param withReplacement Whether to sample with replacement.
- * @param seed the random seed
- * @param child the LocalNode
- */
-case class SampleNode(
-    conf: SQLConf,
-    lowerBound: Double,
-    upperBound: Double,
-    withReplacement: Boolean,
-    seed: Long,
-    child: LocalNode) extends UnaryLocalNode(conf) {
-  override def output: Seq[Attribute] = child.output
-  private[this] var iterator: Iterator[InternalRow] = _
-  private[this] var currentRow: InternalRow = _
-  override def open(): Unit = {
-    child.open()
-    val sampler =
-      if (withReplacement) {
-        // Disable gap sampling since the gap sampling method buffers two rows internally,
-        // requiring us to copy the row, which is more expensive than the random number generator.
-        new PoissonSampler[InternalRow](upperBound - lowerBound, useGapSamplingIfPossible = false)
-      } else {
-        new BernoulliCellSampler[InternalRow](lowerBound, upperBound)
-      }
-    sampler.setSeed(seed)
-    iterator = sampler.sample(child.asIterator)
-  }
-  override def next(): Boolean = {
-    if (iterator.hasNext) {
-      currentRow = iterator.next()
-      true
-    } else {
-      false
-    }
-  }
-  override def fetch(): InternalRow = currentRow
-  override def close(): Unit = child.close()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala
deleted file mode 100644
index 8ebfe3a68b3a3104b894a1f341da3c054c0852d5..0000000000000000000000000000000000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*    http://www.apache.org/licenses/LICENSE-2.0
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql.execution.local
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.internal.SQLConf
- * An operator that scans some local data collection in the form of Scala Seq.
- */
-case class SeqScanNode(conf: SQLConf, output: Seq[Attribute], data: Seq[InternalRow])
-  extends LeafLocalNode(conf) {
-  private[this] var iterator: Iterator[InternalRow] = _
-  private[this] var currentRow: InternalRow = _
-  override def open(): Unit = {
-    iterator = data.iterator
-  }
-  override def next(): Boolean = {
-    if (iterator.hasNext) {
-      currentRow = iterator.next()
-      true
-    } else {
-      false
-    }
-  }
-  override def fetch(): InternalRow = currentRow
-  override def close(): Unit = {
-    // Do nothing
-  }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/TakeOrderedAndProjectNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/TakeOrderedAndProjectNode.scala
deleted file mode 100644
index f52f5f7bb59b7a9c79cf35f8d78e71d52e892e4c..0000000000000000000000000000000000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/TakeOrderedAndProjectNode.scala
+++ /dev/null
@@ -1,74 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.local
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.BoundedPriorityQueue
-case class TakeOrderedAndProjectNode(
-    conf: SQLConf,
-    limit: Int,
-    sortOrder: Seq[SortOrder],
-    projectList: Option[Seq[NamedExpression]],
-    child: LocalNode) extends UnaryLocalNode(conf) {
-  private[this] var projection: Option[Projection] = _
-  private[this] var ord: Ordering[InternalRow] = _
-  private[this] var iterator: Iterator[InternalRow] = _
-  private[this] var currentRow: InternalRow = _
-  override def output: Seq[Attribute] = {
-    val projectOutput = projectList.map(_.map(_.toAttribute))
-    projectOutput.getOrElse(child.output)
-  }
-  override def open(): Unit = {
-    child.open()
-    projection = projectList.map(UnsafeProjection.create(_, child.output))
-    ord = GenerateOrdering.generate(sortOrder, child.output)
-    // Priority keeps the largest elements, so let's reverse the ordering.
-    val queue = new BoundedPriorityQueue[InternalRow](limit)(ord.reverse)
-    while (child.next()) {
-      queue += child.fetch()
-    }
-    // Close it eagerly since we don't need it.
-    child.close()
-    iterator = queue.toArray.sorted(ord).iterator
-  }
-  override def next(): Boolean = {
-    if (iterator.hasNext) {
-      val _currentRow = iterator.next()
-      currentRow = projection match {
-        case Some(p) => p(_currentRow)
-        case None => _currentRow
-      }
-      true
-    } else {
-      false
-    }
-  }
-  override def fetch(): InternalRow = currentRow
-  override def close(): Unit = child.close()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/UnionNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/UnionNode.scala
deleted file mode 100644
index e53bc220d8d3475d3b32b37c602ba656a3316492..0000000000000000000000000000000000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/UnionNode.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*    http://www.apache.org/licenses/LICENSE-2.0
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql.execution.local
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.internal.SQLConf
-case class UnionNode(conf: SQLConf, children: Seq[LocalNode]) extends LocalNode(conf) {
-  override def output: Seq[Attribute] = children.head.output
-  private[this] var currentChild: LocalNode = _
-  private[this] var nextChildIndex: Int = _
-  override def open(): Unit = {
-    currentChild = children.head
-    currentChild.open()
-    nextChildIndex = 1
-  }
-  private def advanceToNextChild(): Boolean = {
-    var found = false
-    var exit = false
-    while (!exit && !found) {
-      if (currentChild != null) {
-        currentChild.close()
-      }
-      if (nextChildIndex >= children.size) {
-        found = false
-        exit = true
-      } else {
-        currentChild = children(nextChildIndex)
-        nextChildIndex += 1
-        currentChild.open()
-        found = currentChild.next()
-      }
-    }
-    found
-  }
-  override def close(): Unit = {
-    if (currentChild != null) {
-      currentChild.close()
-    }
-  }
-  override def fetch(): InternalRow = currentChild.fetch()
-  override def next(): Boolean = {
-    if (currentChild.next()) {
-      true
-    } else {
-      advanceToNextChild()
-    }
-  }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/DummyNode.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/DummyNode.scala
deleted file mode 100644
index cd9277d3bcf1ad4cb9d61e3382424756897bd0fb..0000000000000000000000000000000000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/DummyNode.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*    http://www.apache.org/licenses/LICENSE-2.0
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql.execution.local
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
-import org.apache.spark.sql.internal.SQLConf
- * A dummy [[LocalNode]] that just returns rows from a [[LocalRelation]].
- */
-private[local] case class DummyNode(
-    output: Seq[Attribute],
-    relation: LocalRelation,
-    conf: SQLConf)
-  extends LocalNode(conf) {
-  import DummyNode._
-  private var index: Int = CLOSED
-  private val input: Seq[InternalRow] = relation.data
-  def this(output: Seq[Attribute], data: Seq[Product], conf: SQLConf = new SQLConf) {
-    this(output, LocalRelation.fromProduct(output, data), conf)
-  }
-  def isOpen: Boolean = index != CLOSED
-  override def children: Seq[LocalNode] = Seq.empty
-  override def open(): Unit = {
-    index = -1
-  }
-  override def next(): Boolean = {
-    index += 1
-    index < input.size
-  }
-  override def fetch(): InternalRow = {
-    assert(index >= 0 && index < input.size)
-    input(index)
-  }
-  override def close(): Unit = {
-    index = CLOSED
-  }
-private object DummyNode {
-  val CLOSED: Int = Int.MinValue
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ExpandNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ExpandNodeSuite.scala
deleted file mode 100644
index bbd94d8da2d11ddf91b45f88c5c4f663295e1f31..0000000000000000000000000000000000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ExpandNodeSuite.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*    http://www.apache.org/licenses/LICENSE-2.0
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql.execution.local
-import org.apache.spark.sql.catalyst.dsl.expressions._
-class ExpandNodeSuite extends LocalNodeTest {
-  private def testExpand(inputData: Array[(Int, Int)] = Array.empty): Unit = {
-    val inputNode = new DummyNode(kvIntAttributes, inputData)
-    val projections = Seq(Seq('k + 'v, 'k - 'v), Seq('k * 'v, 'k / 'v))
-    val expandNode = new ExpandNode(conf, projections, inputNode.output, inputNode)
-    val resolvedNode = resolveExpressions(expandNode)
-    val expectedOutput = {
-      val firstHalf = inputData.map { case (k, v) => (k + v, k - v) }
-      val secondHalf = inputData.map { case (k, v) => (k * v, k / v) }
-      firstHalf ++ secondHalf
-    }
-    val actualOutput = resolvedNode.collect().map { case row =>
-      (row.getInt(0), row.getInt(1))
-    }
-    assert(actualOutput.toSet === expectedOutput.toSet)
-  }
-  test("empty") {
-    testExpand()
-  }
-  test("basic") {
-    testExpand((1 to 100).map { i => (i, i * 1000) }.toArray)
-  }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/FilterNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/FilterNodeSuite.scala
deleted file mode 100644
index 4eadce646d379501b8c0d5bb862f065c858d3486..0000000000000000000000000000000000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/FilterNodeSuite.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*    http://www.apache.org/licenses/LICENSE-2.0
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql.execution.local
-import org.apache.spark.sql.catalyst.dsl.expressions._
-class FilterNodeSuite extends LocalNodeTest {
-  private def testFilter(inputData: Array[(Int, Int)] = Array.empty): Unit = {
-    val cond = 'k % 2 === 0
-    val inputNode = new DummyNode(kvIntAttributes, inputData)
-    val filterNode = new FilterNode(conf, cond, inputNode)
-    val resolvedNode = resolveExpressions(filterNode)
-    val expectedOutput = inputData.filter { case (k, _) => k % 2 == 0 }
-    val actualOutput = resolvedNode.collect().map { case row =>
-      (row.getInt(0), row.getInt(1))
-    }
-    assert(actualOutput === expectedOutput)
-  }
-  test("empty") {
-    testFilter()
-  }
-  test("basic") {
-    testFilter((1 to 100).map { i => (i, i) }.toArray)
-  }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala
deleted file mode 100644
index 74142ea598d9deabe3a62a175db595adbf51967b..0000000000000000000000000000000000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala
+++ /dev/null
@@ -1,141 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*    http://www.apache.org/licenses/LICENSE-2.0
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql.execution.local
-import org.mockito.Mockito.{mock, when}
-import org.apache.spark.broadcast.TorrentBroadcast
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions.{Expression, UnsafeProjection}
-import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, HashedRelation}
-import org.apache.spark.sql.internal.SQLConf
-class HashJoinNodeSuite extends LocalNodeTest {
-  // Test all combinations of the two dimensions: with/out unsafe and build sides
-  private val buildSides = Seq(BuildLeft, BuildRight)
-  buildSides.foreach { buildSide =>
-    testJoin(buildSide)
-  }
-  /**
-   * Builds a [[HashedRelation]] based on a resolved `buildKeys`
-   * and a resolved `buildNode`.
-   */
-  private def buildHashedRelation(
-      conf: SQLConf,
-      buildKeys: Seq[Expression],
-      buildNode: LocalNode): HashedRelation = {
-    val buildSideKeyGenerator = UnsafeProjection.create(buildKeys, buildNode.output)
-    buildNode.prepare()
-    buildNode.open()
-    val hashedRelation = HashedRelation(buildNode, buildSideKeyGenerator)
-    buildNode.close()
-    hashedRelation
-  }
-  /**
-   * Test inner hash join with varying degrees of matches.
-   */
-  private def testJoin(buildSide: BuildSide): Unit = {
-    val testNamePrefix = buildSide
-    val someData = (1 to 100).map { i => (i, "burger" + i) }.toArray
-    val conf = new SQLConf
-    // Actual test body
-    def runTest(leftInput: Array[(Int, String)], rightInput: Array[(Int, String)]): Unit = {
-      val rightInputMap = rightInput.toMap
-      val leftNode = new DummyNode(joinNameAttributes, leftInput)
-      val rightNode = new DummyNode(joinNicknameAttributes, rightInput)
-      val makeBinaryHashJoinNode = (node1: LocalNode, node2: LocalNode) => {
-        val binaryHashJoinNode =
-          BinaryHashJoinNode(conf, Seq('id1), Seq('id2), buildSide, node1, node2)
-        resolveExpressions(binaryHashJoinNode)
-      }
-      val makeBroadcastJoinNode = (node1: LocalNode, node2: LocalNode) => {
-        val leftKeys = Seq('id1.attr)
-        val rightKeys = Seq('id2.attr)
-        // Figure out the build side and stream side.
-        val (buildNode, buildKeys, streamedNode, streamedKeys) = buildSide match {
-          case BuildLeft => (node1, leftKeys, node2, rightKeys)
-          case BuildRight => (node2, rightKeys, node1, leftKeys)
-        }
-        // Resolve the expressions of the build side and then create a HashedRelation.
-        val resolvedBuildNode = resolveExpressions(buildNode)
-        val resolvedBuildKeys = resolveExpressions(buildKeys, resolvedBuildNode)
-        val hashedRelation = buildHashedRelation(conf, resolvedBuildKeys, resolvedBuildNode)
-        val broadcastHashedRelation = mock(classOf[TorrentBroadcast[HashedRelation]])
-        when(broadcastHashedRelation.value).thenReturn(hashedRelation)
-        val hashJoinNode =
-          BroadcastHashJoinNode(
-            conf,
-            streamedKeys,
-            streamedNode,
-            buildSide,
-            resolvedBuildNode.output,
-            broadcastHashedRelation)
-        resolveExpressions(hashJoinNode)
-      }
-      val expectedOutput = leftInput
-        .filter { case (k, _) => rightInputMap.contains(k) }
-        .map { case (k, v) => (k, v, k, rightInputMap(k)) }
-      Seq(makeBinaryHashJoinNode, makeBroadcastJoinNode).foreach { makeNode =>
-        val makeUnsafeNode = wrapForUnsafe(makeNode)
-        val hashJoinNode = makeUnsafeNode(leftNode, rightNode)
-        val actualOutput = hashJoinNode.collect().map { row =>
-          // (id, name, id, nickname)
-          (row.getInt(0), row.getString(1), row.getInt(2), row.getString(3))
-        }
-        assert(actualOutput === expectedOutput)
-      }
-    }
-    test(s"$testNamePrefix: empty") {
-      runTest(Array.empty, Array.empty)
-      runTest(someData, Array.empty)
-      runTest(Array.empty, someData)
-    }
-    test(s"$testNamePrefix: no matches") {
-      val someIrrelevantData = (10000 to 100100).map { i => (i, "piper" + i) }.toArray
-      runTest(someData, Array.empty)
-      runTest(Array.empty, someData)
-      runTest(someData, someIrrelevantData)
-      runTest(someIrrelevantData, someData)
-    }
-    test(s"$testNamePrefix: partial matches") {
-      val someOtherData = (50 to 150).map { i => (i, "finnegan" + i) }.toArray
-      runTest(someData, someOtherData)
-      runTest(someOtherData, someData)
-    }
-    test(s"$testNamePrefix: full matches") {
-      val someSuperRelevantData = someData.map { case (k, v) => (k, "cooper" + v) }.toArray
-      runTest(someData, someSuperRelevantData)
-      runTest(someSuperRelevantData, someData)
-    }
-  }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/IntersectNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/IntersectNodeSuite.scala
deleted file mode 100644
index c0ad2021b204a1ec666bd07b14ae81e7694b00d8..0000000000000000000000000000000000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/IntersectNodeSuite.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*    http://www.apache.org/licenses/LICENSE-2.0
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql.execution.local
-class IntersectNodeSuite extends LocalNodeTest {
-  test("basic") {
-    val n = 100
-    val leftData = (1 to n).filter { i => i % 2 == 0 }.map { i => (i, i) }.toArray
-    val rightData = (1 to n).filter { i => i % 3 == 0 }.map { i => (i, i) }.toArray
-    val leftNode = new DummyNode(kvIntAttributes, leftData)
-    val rightNode = new DummyNode(kvIntAttributes, rightData)
-    val intersectNode = new IntersectNode(conf, leftNode, rightNode)
-    val expectedOutput = leftData.intersect(rightData)
-    val actualOutput = intersectNode.collect().map { case row =>
-      (row.getInt(0), row.getInt(1))
-    }
-    assert(actualOutput === expectedOutput)
-  }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LimitNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LimitNodeSuite.scala
deleted file mode 100644
index fb790636a368948ddd92ecead3e64b76053cba16..0000000000000000000000000000000000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LimitNodeSuite.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*    http://www.apache.org/licenses/LICENSE-2.0
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql.execution.local
-class LimitNodeSuite extends LocalNodeTest {
-  private def testLimit(inputData: Array[(Int, Int)] = Array.empty, limit: Int = 10): Unit = {
-    val inputNode = new DummyNode(kvIntAttributes, inputData)
-    val limitNode = new LimitNode(conf, limit, inputNode)
-    val expectedOutput = inputData.take(limit)
-    val actualOutput = limitNode.collect().map { case row =>
-      (row.getInt(0), row.getInt(1))
-    }
-    assert(actualOutput === expectedOutput)
-  }
-  test("empty") {
-    testLimit()
-  }
-  test("basic") {
-    testLimit((1 to 100).map { i => (i, i) }.toArray, 20)
-  }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeSuite.scala
deleted file mode 100644
index 0d1ed99eec6cd7b85d7908b4370c5d9d6f357972..0000000000000000000000000000000000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeSuite.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*    http://www.apache.org/licenses/LICENSE-2.0
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql.execution.local
-class LocalNodeSuite extends LocalNodeTest {
-  private val data = (1 to 100).map { i => (i, i) }.toArray
-  test("basic open, next, fetch, close") {
-    val node = new DummyNode(kvIntAttributes, data)
-    assert(!node.isOpen)
-    node.open()
-    assert(node.isOpen)
-    data.foreach { case (k, v) =>
-      assert(node.next())
-      // fetch should be idempotent
-      val fetched = node.fetch()
-      assert(node.fetch() === fetched)
-      assert(node.fetch() === fetched)
-      assert(node.fetch().numFields === 2)
-      assert(node.fetch().getInt(0) === k)
-      assert(node.fetch().getInt(1) === v)
-    }
-    assert(!node.next())
-    node.close()
-    assert(!node.isOpen)
-  }
-  test("asIterator") {
-    val node = new DummyNode(kvIntAttributes, data)
-    val iter = node.asIterator
-    node.open()
-    data.foreach { case (k, v) =>
-      // hasNext should be idempotent
-      assert(iter.hasNext)
-      assert(iter.hasNext)
-      val item = iter.next()
-      assert(item.numFields === 2)
-      assert(item.getInt(0) === k)
-      assert(item.getInt(1) === v)
-    }
-    intercept[NoSuchElementException] {
-      iter.next()
-    }
-    node.close()
-  }
-  test("collect") {
-    val node = new DummyNode(kvIntAttributes, data)
-    node.open()
-    val collected = node.collect()
-    assert(collected.size === data.size)
-    assert(collected.forall(_.size === 2))
-    assert(collected.map { case row => (row.getInt(0), row.getInt(0)) } === data)
-    node.close()
-  }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala
deleted file mode 100644
index cd67a66ebf57620b364f72ff81f1de26609dc6c7..0000000000000000000000000000000000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*    http://www.apache.org/licenses/LICENSE-2.0
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql.execution.local
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{IntegerType, StringType}
-class LocalNodeTest extends SparkFunSuite {
-  protected val conf: SQLConf = new SQLConf
-  protected val kvIntAttributes = Seq(
-    AttributeReference("k", IntegerType)(),
-    AttributeReference("v", IntegerType)())
-  protected val joinNameAttributes = Seq(
-    AttributeReference("id1", IntegerType)(),
-    AttributeReference("name", StringType)())
-  protected val joinNicknameAttributes = Seq(
-    AttributeReference("id2", IntegerType)(),
-    AttributeReference("nickname", StringType)())
-  /**
-   * Wrap a function processing two [[LocalNode]]s such that:
-   *   (1) all input rows are automatically converted to unsafe rows
-   *   (2) all output rows are automatically converted back to safe rows
-   */
-  protected def wrapForUnsafe(
-      f: (LocalNode, LocalNode) => LocalNode): (LocalNode, LocalNode) => LocalNode = {
-    (left: LocalNode, right: LocalNode) => {
-      val _left = ConvertToUnsafeNode(conf, left)
-      val _right = ConvertToUnsafeNode(conf, right)
-      val r = f(_left, _right)
-      ConvertToSafeNode(conf, r)
-    }
-  }
-  /**
-   * Recursively resolve all expressions in a [[LocalNode]] using the node's attributes.
-   */
-  protected def resolveExpressions(outputNode: LocalNode): LocalNode = {
-    outputNode transform {
-      case node: LocalNode =>
-        val inputMap = node.output.map { a => (a.name, a) }.toMap
-        node transformExpressions {
-          case UnresolvedAttribute(Seq(u)) =>
-            inputMap.getOrElse(u,
-              sys.error(s"Invalid Test: Cannot resolve $u given input $inputMap"))
-        }
-    }
-  }
-  /**
-   * Resolve all expressions in `expressions` based on the `output` of `localNode`.
-   * It assumes that all expressions in the `localNode` are resolved.
-   */
-  protected def resolveExpressions(
-      expressions: Seq[Expression],
-      localNode: LocalNode): Seq[Expression] = {
-    require(localNode.expressions.forall(_.resolved))
-    val inputMap = localNode.output.map { a => (a.name, a) }.toMap
-    expressions.map { expression =>
-      expression.transformUp {
-        case UnresolvedAttribute(Seq(u)) =>
-          inputMap.getOrElse(u,
-            sys.error(s"Invalid Test: Cannot resolve $u given input $inputMap"))
-      }
-    }
-  }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala
deleted file mode 100644
index bcc87a917551731b6d635ff66c98c5e5f2626fb4..0000000000000000000000000000000000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*    http://www.apache.org/licenses/LICENSE-2.0
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql.execution.local
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter}
-import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
-import org.apache.spark.sql.internal.SQLConf
-class NestedLoopJoinNodeSuite extends LocalNodeTest {
-  // Test all combinations of the three dimensions: with/out unsafe, build sides, and join types
-  private val buildSides = Seq(BuildLeft, BuildRight)
-  private val joinTypes = Seq(LeftOuter, RightOuter, FullOuter)
-  buildSides.foreach { buildSide =>
-    joinTypes.foreach { joinType =>
-      testJoin(buildSide, joinType)
-    }
-  }
-  /**
-   * Test outer nested loop joins with varying degrees of matches.
-   */
-  private def testJoin(buildSide: BuildSide, joinType: JoinType): Unit = {
-    val testNamePrefix = s"$buildSide / $joinType"
-    val someData = (1 to 100).map { i => (i, "burger" + i) }.toArray
-    val conf = new SQLConf
-    // Actual test body
-    def runTest(
-        joinType: JoinType,
-        leftInput: Array[(Int, String)],
-        rightInput: Array[(Int, String)]): Unit = {
-      val leftNode = new DummyNode(joinNameAttributes, leftInput)
-      val rightNode = new DummyNode(joinNicknameAttributes, rightInput)
-      val cond = 'id1 === 'id2
-      val makeNode = (node1: LocalNode, node2: LocalNode) => {
-        resolveExpressions(
-          new NestedLoopJoinNode(conf, node1, node2, buildSide, joinType, Some(cond)))
-      }
-      val makeUnsafeNode = wrapForUnsafe(makeNode)
-      val hashJoinNode = makeUnsafeNode(leftNode, rightNode)
-      val expectedOutput = generateExpectedOutput(leftInput, rightInput, joinType)
-      val actualOutput = hashJoinNode.collect().map { row =>
-        // (
-        //   id, name,
-        //   id, nickname
-        // )
-        (
-          Option(row.get(0)).map(_.asInstanceOf[Int]), Option(row.getString(1)),
-          Option(row.get(2)).map(_.asInstanceOf[Int]), Option(row.getString(3))
-        )
-      }
-      assert(actualOutput.toSet === expectedOutput.toSet)
-    }
-    test(s"$testNamePrefix: empty") {
-      runTest(joinType, Array.empty, Array.empty)
-    }
-    test(s"$testNamePrefix: no matches") {
-      val someIrrelevantData = (10000 to 10100).map { i => (i, "piper" + i) }.toArray
-      runTest(joinType, someData, Array.empty)
-      runTest(joinType, Array.empty, someData)
-      runTest(joinType, someData, someIrrelevantData)
-      runTest(joinType, someIrrelevantData, someData)
-    }
-    test(s"$testNamePrefix: partial matches") {
-      val someOtherData = (50 to 150).map { i => (i, "finnegan" + i) }.toArray
-      runTest(joinType, someData, someOtherData)
-      runTest(joinType, someOtherData, someData)
-    }
-    test(s"$testNamePrefix: full matches") {
-      val someSuperRelevantData = someData.map { case (k, v) => (k, "cooper" + v) }
-      runTest(joinType, someData, someSuperRelevantData)
-      runTest(joinType, someSuperRelevantData, someData)
-    }
-  }
-  /**
-   * Helper method to generate the expected output of a test based on the join type.
-   */
-  private def generateExpectedOutput(
-      leftInput: Array[(Int, String)],
-      rightInput: Array[(Int, String)],
-      joinType: JoinType): Array[(Option[Int], Option[String], Option[Int], Option[String])] = {
-    joinType match {
-      case LeftOuter =>
-        val rightInputMap = rightInput.toMap
-        leftInput.map { case (k, v) =>
-          val rightKey = rightInputMap.get(k).map { _ => k }
-          val rightValue = rightInputMap.get(k)
-          (Some(k), Some(v), rightKey, rightValue)
-        }
-      case RightOuter =>
-        val leftInputMap = leftInput.toMap
-        rightInput.map { case (k, v) =>
-          val leftKey = leftInputMap.get(k).map { _ => k }
-          val leftValue = leftInputMap.get(k)
-          (leftKey, leftValue, Some(k), Some(v))
-        }
-      case FullOuter =>
-        val leftInputMap = leftInput.toMap
-        val rightInputMap = rightInput.toMap
-        val leftOutput = leftInput.map { case (k, v) =>
-          val rightKey = rightInputMap.get(k).map { _ => k }
-          val rightValue = rightInputMap.get(k)
-          (Some(k), Some(v), rightKey, rightValue)
-        }
-        val rightOutput = rightInput.map { case (k, v) =>
-          val leftKey = leftInputMap.get(k).map { _ => k }
-          val leftValue = leftInputMap.get(k)
-          (leftKey, leftValue, Some(k), Some(v))
-        }
-        (leftOutput ++ rightOutput).distinct
-      case other =>
-        throw new IllegalArgumentException(s"Join type $other is not applicable")
-    }
-  }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ProjectNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ProjectNodeSuite.scala
deleted file mode 100644
index 02ecb23d34b2f2fcd625d98e0842666eaf3fb941..0000000000000000000000000000000000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ProjectNodeSuite.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*    http://www.apache.org/licenses/LICENSE-2.0
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql.execution.local
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, NamedExpression}
-import org.apache.spark.sql.types.{IntegerType, StringType}
-class ProjectNodeSuite extends LocalNodeTest {
-  private val pieAttributes = Seq(
-    AttributeReference("id", IntegerType)(),
-    AttributeReference("age", IntegerType)(),
-    AttributeReference("name", StringType)())
-  private def testProject(inputData: Array[(Int, Int, String)] = Array.empty): Unit = {
-    val inputNode = new DummyNode(pieAttributes, inputData)
-    val columns = Seq[NamedExpression](inputNode.output(0), inputNode.output(2))
-    val projectNode = new ProjectNode(conf, columns, inputNode)
-    val expectedOutput = inputData.map { case (id, age, name) => (id, name) }
-    val actualOutput = projectNode.collect().map { case row =>
-      (row.getInt(0), row.getString(1))
-    }
-    assert(actualOutput === expectedOutput)
-  }
-  test("empty") {
-    testProject()
-  }
-  test("basic") {
-    testProject((1 to 100).map { i => (i, i + 1, "pie" + i) }.toArray)
-  }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/SampleNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/SampleNodeSuite.scala
deleted file mode 100644
index a3e83bbd514576c23652f40aef8fa5c643c211aa..0000000000000000000000000000000000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/SampleNodeSuite.scala
+++ /dev/null
@@ -1,51 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.local
-import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}
-class SampleNodeSuite extends LocalNodeTest {
-  private def testSample(withReplacement: Boolean): Unit = {
-    val seed = 0L
-    val lowerb = 0.0
-    val upperb = 0.3
-    val maybeOut = if (withReplacement) "" else "out"
-    test(s"with$maybeOut replacement") {
-      val inputData = (1 to 1000).map { i => (i, i) }.toArray
-      val inputNode = new DummyNode(kvIntAttributes, inputData)
-      val sampleNode = new SampleNode(conf, lowerb, upperb, withReplacement, seed, inputNode)
-      val sampler =
-        if (withReplacement) {
-          new PoissonSampler[(Int, Int)](upperb - lowerb, useGapSamplingIfPossible = false)
-        } else {
-          new BernoulliCellSampler[(Int, Int)](lowerb, upperb)
-        }
-      sampler.setSeed(seed)
-      val expectedOutput = sampler.sample(inputData.iterator).toArray
-      val actualOutput = sampleNode.collect().map { case row =>
-        (row.getInt(0), row.getInt(1))
-      }
-      assert(actualOutput === expectedOutput)
-    }
-  }
-  testSample(withReplacement = true)
-  testSample(withReplacement = false)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/TakeOrderedAndProjectNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/TakeOrderedAndProjectNodeSuite.scala
deleted file mode 100644
index 42ebc7bfcaadceeab37733531d585f882b981b75..0000000000000000000000000000000000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/TakeOrderedAndProjectNodeSuite.scala
+++ /dev/null
@@ -1,50 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.local
-import scala.util.Random
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.SortOrder
-class TakeOrderedAndProjectNodeSuite extends LocalNodeTest {
-  private def testTakeOrderedAndProject(desc: Boolean): Unit = {
-    val limit = 10
-    val ascOrDesc = if (desc) "desc" else "asc"
-    test(ascOrDesc) {
-      val inputData = Random.shuffle((1 to 100).toList).map { i => (i, i) }.toArray
-      val inputNode = new DummyNode(kvIntAttributes, inputData)
-      val firstColumn = inputNode.output(0)
-      val sortDirection = if (desc) Descending else Ascending
-      val sortOrder = SortOrder(firstColumn, sortDirection)
-      val takeOrderAndProjectNode = new TakeOrderedAndProjectNode(
-        conf, limit, Seq(sortOrder), Some(Seq(firstColumn)), inputNode)
-      val expectedOutput = inputData
-        .map { case (k, _) => k }
-        .sortBy { k => k * (if (desc) -1 else 1) }
-        .take(limit)
-      val actualOutput = takeOrderAndProjectNode.collect().map { row => row.getInt(0) }
-      assert(actualOutput === expectedOutput)
-    }
-  }
-  testTakeOrderedAndProject(desc = false)
-  testTakeOrderedAndProject(desc = true)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/UnionNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/UnionNodeSuite.scala
deleted file mode 100644
index 666b0235c061d987d78674816bbfd54e01a7ba27..0000000000000000000000000000000000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/UnionNodeSuite.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*    http://www.apache.org/licenses/LICENSE-2.0
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql.execution.local
-class UnionNodeSuite extends LocalNodeTest {
-  private def testUnion(inputData: Seq[Array[(Int, Int)]]): Unit = {
-    val inputNodes = inputData.map { data =>
-      new DummyNode(kvIntAttributes, data)
-    }
-    val unionNode = new UnionNode(conf, inputNodes)
-    val expectedOutput = inputData.flatten
-    val actualOutput = unionNode.collect().map { case row =>
-      (row.getInt(0), row.getInt(1))
-    }
-    assert(actualOutput === expectedOutput)
-  }
-  test("empty") {
-    testUnion(Seq(Array.empty))
-    testUnion(Seq(Array.empty, Array.empty))
-  }
-  test("self") {
-    val data = (1 to 100).map { i => (i, i) }.toArray
-    testUnion(Seq(data))
-    testUnion(Seq(data, data))
-    testUnion(Seq(data, data, data))
-  }
-  test("basic") {
-    val zero = Array.empty[(Int, Int)]
-    val one = (1 to 100).map { i => (i, i) }.toArray
-    val two = (50 to 150).map { i => (i, i) }.toArray
-    val three = (800 to 900).map { i => (i, i) }.toArray
-    testUnion(Seq(zero, one, two, three))
-  }