diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala
new file mode 100644
index 0000000000000000000000000000000000000000..a485a1a1d7ae4521a42dfdb8c9b3ee3a48a2b3c2
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala
@@ -0,0 +1,47 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.local
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
+
+
+case class FilterNode(condition: Expression, child: LocalNode) extends UnaryLocalNode {
+
+  private[this] var predicate: (InternalRow) => Boolean = _
+
+  override def output: Seq[Attribute] = child.output
+
+  override def open(): Unit = {
+    child.open()
+    predicate = GeneratePredicate.generate(condition, child.output)
+  }
+
+  override def next(): Boolean = {
+    var found = false
+    while (child.next() && !found) {
+      found = predicate.apply(child.get())
+    }
+    found
+  }
+
+  override def get(): InternalRow = child.get()
+
+  override def close(): Unit = child.close()
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
new file mode 100644
index 0000000000000000000000000000000000000000..341c81438e6d637c01519676e3f43a263de74faf
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
@@ -0,0 +1,86 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.local
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.trees.TreeNode
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A local physical operator, in the form of an iterator.
+ *
+ * Before consuming the iterator, open function must be called.
+ * After consuming the iterator, close function must be called.
+ */
+abstract class LocalNode extends TreeNode[LocalNode] {
+
+  def output: Seq[Attribute]
+
+  /**
+   * Initializes the iterator state. Must be called before calling `next()`.
+   *
+   * Implementations of this must also call the `open()` function of its children.
+   */
+  def open(): Unit
+
+  /**
+   * Advances the iterator to the next tuple. Returns true if there is at least one more tuple.
+   */
+  def next(): Boolean
+
+  /**
+   * Returns the current tuple.
+   */
+  def get(): InternalRow
+
+  /**
+   * Closes the iterator and releases all resources.
+   *
+   * Implementations of this must also call the `close()` function of its children.
+   */
+  def close(): Unit
+
+  /**
+   * Returns the content of the iterator from the beginning to the end in the form of a Scala Seq.
+   */
+  def collect(): Seq[Row] = {
+    val converter = CatalystTypeConverters.createToScalaConverter(StructType.fromAttributes(output))
+    val result = new scala.collection.mutable.ArrayBuffer[Row]
+    open()
+    while (next()) {
+      result += converter.apply(get()).asInstanceOf[Row]
+    }
+    close()
+    result
+  }
+}
+
+
+abstract class LeafLocalNode extends LocalNode {
+  override def children: Seq[LocalNode] = Seq.empty
+}
+
+
+abstract class UnaryLocalNode extends LocalNode {
+
+  def child: LocalNode
+
+  override def children: Seq[LocalNode] = Seq(child)
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala
new file mode 100644
index 0000000000000000000000000000000000000000..e574d1473cdcb7e7d6a8d25a47aff1a7ab818e28
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala
@@ -0,0 +1,42 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.local
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, Attribute, NamedExpression}
+
+
+case class ProjectNode(projectList: Seq[NamedExpression], child: LocalNode) extends UnaryLocalNode {
+
+  private[this] var project: UnsafeProjection = _
+
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def open(): Unit = {
+    project = UnsafeProjection.create(projectList, child.output)
+    child.open()
+  }
+
+  override def next(): Boolean = child.next()
+
+  override def get(): InternalRow = {
+    project.apply(child.get())
+  }
+
+  override def close(): Unit = child.close()
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala
new file mode 100644
index 0000000000000000000000000000000000000000..994de8afa9a02fb8ecf2ab0ab3854b11ad82bbe1
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala
@@ -0,0 +1,49 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.local
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
+/**
+ * An operator that scans some local data collection in the form of Scala Seq.
+ */
+case class SeqScanNode(output: Seq[Attribute], data: Seq[InternalRow]) extends LeafLocalNode {
+
+  private[this] var iterator: Iterator[InternalRow] = _
+  private[this] var currentRow: InternalRow = _
+
+  override def open(): Unit = {
+    iterator = data.iterator
+  }
+
+  override def next(): Boolean = {
+    if (iterator.hasNext) {
+      currentRow = iterator.next()
+      true
+    } else {
+      false
+    }
+  }
+
+  override def get(): InternalRow = currentRow
+
+  override def close(): Unit = {
+    // Do nothing
+  }
+}