diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
index 1d513d77897636e49e38afa0092e1a18697ce1d4..5a1863953eae942a8726e468e7aa6c7dcc9ec2f4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
@@ -29,23 +29,6 @@ abstract class Command extends LeafNode {
   def output: Seq[Attribute] = Seq.empty
 }
 
-/**
- * Returned for commands supported by a given parser, but not catalyst.  In general these are DDL
- * commands that are passed directly to another system.
- */
-case class NativeCommand(cmd: String) extends Command {
-  override def output =
-    Seq(AttributeReference("result", StringType, nullable = false)())
-}
-
-/**
- * Commands of the form "SET [key [= value] ]".
- */
-case class DFSCommand(kv: Option[(String, Option[String])]) extends Command {
-  override def output = Seq(
-    AttributeReference("DFS output", StringType, nullable = false)())
-}
-
 /**
  *
  * Commands of the form "SET [key [= value] ]".
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index ebd4cc920b1adf6eee6d3b13683a40952ca1366b..7a13302229012bcf9e4d946b35a9677219959000 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -329,7 +329,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
     def strategies: Seq[Strategy] =
       extraStrategies ++ (
-      CommandStrategy(self) ::
+      CommandStrategy ::
       DataSourceStrategy ::
       TakeOrdered ::
       HashAggregation ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 6e04f26c84259f7ff09e88551b018f00020edd00..2954d4ce7d2d87007372584eb8c8416c534d7a5a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -304,17 +304,20 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
     }
   }
 
-  case class CommandStrategy(context: SQLContext) extends Strategy {
+  case object CommandStrategy extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case r: RunnableCommand => ExecutedCommand(r) :: Nil
       case logical.SetCommand(kv) =>
-        Seq(execution.SetCommand(kv, plan.output)(context))
+        Seq(ExecutedCommand(execution.SetCommand(kv, plan.output)))
       case logical.ExplainCommand(logicalPlan, extended) =>
-        Seq(execution.ExplainCommand(logicalPlan, plan.output, extended)(context))
+        Seq(ExecutedCommand(
+          execution.ExplainCommand(logicalPlan, plan.output, extended)))
       case logical.CacheTableCommand(tableName, optPlan, isLazy) =>
-        Seq(execution.CacheTableCommand(tableName, optPlan, isLazy))
+        Seq(ExecutedCommand(
+          execution.CacheTableCommand(tableName, optPlan, isLazy)))
       case logical.UncacheTableCommand(tableName) =>
-        Seq(execution.UncacheTableCommand(tableName))
+        Seq(ExecutedCommand(
+          execution.UncacheTableCommand(tableName)))
       case _ => Nil
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index afe3f3f07440cd42a15115cc54518a19b1aea335..b8fa4b019953e0c7285f71dfefa3e0268f373200 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -26,34 +26,20 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.{SQLConf, SQLContext}
 
-// TODO: DELETE ME...
-trait Command {
-  this: SparkPlan =>
-
-  /**
-   * A concrete command should override this lazy field to wrap up any side effects caused by the
-   * command or any other computation that should be evaluated exactly once. The value of this field
-   * can be used as the contents of the corresponding RDD generated from the physical plan of this
-   * command.
-   *
-   * The `execute()` method of all the physical command classes should reference `sideEffectResult`
-   * so that the command can be executed eagerly right after the command query is created.
-   */
-  protected lazy val sideEffectResult: Seq[Row] = Seq.empty[Row]
-
-  override def executeCollect(): Array[Row] = sideEffectResult.toArray
-
-  override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1)
-}
-
-// TODO: Replace command with runnable command.
+/**
+ * A logical command that is executed for its side-effects.  `RunnableCommand`s are
+ * wrapped in `ExecutedCommand` during execution.
+ */
 trait RunnableCommand extends logical.Command {
   self: Product =>
 
-  def output: Seq[Attribute]
   def run(sqlContext: SQLContext): Seq[Row]
 }
 
+/**
+ * A physical operator that executes the run method of a `RunnableCommand` and
+ * saves the result to prevent multiple executions.
+ */
 case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan {
   /**
    * A concrete command should override this lazy field to wrap up any side effects caused by the
@@ -79,43 +65,41 @@ case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan {
  * :: DeveloperApi ::
  */
 @DeveloperApi
-case class SetCommand(kv: Option[(String, Option[String])], output: Seq[Attribute])(
-    @transient context: SQLContext)
-  extends LeafNode with Command with Logging {
+case class SetCommand(
+    kv: Option[(String, Option[String])],
+    override val output: Seq[Attribute]) extends RunnableCommand with Logging {
 
-  override protected lazy val sideEffectResult: Seq[Row] = kv match {
+  override def run(sqlContext: SQLContext) = kv match {
     // Configures the deprecated "mapred.reduce.tasks" property.
     case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, Some(value))) =>
       logWarning(
         s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
           s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
-      context.setConf(SQLConf.SHUFFLE_PARTITIONS, value)
+      sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS, value)
       Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$value"))
 
     // Configures a single property.
     case Some((key, Some(value))) =>
-      context.setConf(key, value)
+      sqlContext.setConf(key, value)
       Seq(Row(s"$key=$value"))
 
-    // Queries all key-value pairs that are set in the SQLConf of the context. Notice that different
-    // from Hive, here "SET -v" is an alias of "SET". (In Hive, "SET" returns all changed properties
-    // while "SET -v" returns all properties.)
+    // Queries all key-value pairs that are set in the SQLConf of the sqlContext.
+    // Notice that different from Hive, here "SET -v" is an alias of "SET".
+    // (In Hive, "SET" returns all changed properties while "SET -v" returns all properties.)
     case Some(("-v", None)) | None =>
-      context.getAllConfs.map { case (k, v) => Row(s"$k=$v") }.toSeq
+      sqlContext.getAllConfs.map { case (k, v) => Row(s"$k=$v") }.toSeq
 
     // Queries the deprecated "mapred.reduce.tasks" property.
     case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, None)) =>
       logWarning(
         s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
           s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.")
-      Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${context.numShufflePartitions}"))
+      Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${sqlContext.numShufflePartitions}"))
 
     // Queries a single property.
     case Some((key, None)) =>
-      Seq(Row(s"$key=${context.getConf(key, "<undefined>")}"))
+      Seq(Row(s"$key=${sqlContext.getConf(key, "<undefined>")}"))
   }
-
-  override def otherCopyArgs = context :: Nil
 }
 
 /**
@@ -128,22 +112,19 @@ case class SetCommand(kv: Option[(String, Option[String])], output: Seq[Attribut
  */
 @DeveloperApi
 case class ExplainCommand(
-    logicalPlan: LogicalPlan, output: Seq[Attribute], extended: Boolean)(
-    @transient context: SQLContext)
-  extends LeafNode with Command {
+    logicalPlan: LogicalPlan,
+    override val output: Seq[Attribute], extended: Boolean) extends RunnableCommand {
 
   // Run through the optimizer to generate the physical plan.
-  override protected lazy val sideEffectResult: Seq[Row] = try {
+  override def run(sqlContext: SQLContext) = try {
     // TODO in Hive, the "extended" ExplainCommand prints the AST as well, and detailed properties.
-    val queryExecution = context.executePlan(logicalPlan)
+    val queryExecution = sqlContext.executePlan(logicalPlan)
     val outputString = if (extended) queryExecution.toString else queryExecution.simpleString
 
     outputString.split("\n").map(Row(_))
   } catch { case cause: TreeNodeException[_] =>
     ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_))
   }
-
-  override def otherCopyArgs = context :: Nil
 }
 
 /**
@@ -153,10 +134,9 @@ case class ExplainCommand(
 case class CacheTableCommand(
     tableName: String,
     plan: Option[LogicalPlan],
-    isLazy: Boolean)
-  extends LeafNode with Command {
+    isLazy: Boolean) extends RunnableCommand {
 
-  override protected lazy val sideEffectResult = {
+  override def run(sqlContext: SQLContext) = {
     import sqlContext._
 
     plan.foreach(_.registerTempTable(tableName))
@@ -178,8 +158,9 @@ case class CacheTableCommand(
  * :: DeveloperApi ::
  */
 @DeveloperApi
-case class UncacheTableCommand(tableName: String) extends LeafNode with Command {
-  override protected lazy val sideEffectResult: Seq[Row] = {
+case class UncacheTableCommand(tableName: String) extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext) = {
     sqlContext.table(tableName).unpersist()
     Seq.empty[Row]
   }
@@ -191,11 +172,11 @@ case class UncacheTableCommand(tableName: String) extends LeafNode with Command
  * :: DeveloperApi ::
  */
 @DeveloperApi
-case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
-    @transient context: SQLContext)
-  extends LeafNode with Command {
+case class DescribeCommand(
+    child: SparkPlan,
+    override val output: Seq[Attribute]) extends RunnableCommand {
 
-  override protected lazy val sideEffectResult: Seq[Row] = {
+  override def run(sqlContext: SQLContext) = {
     Row("# Registered as a temporary table", null, null) +:
       child.output.map(field => Row(field.name, field.dataType.toString, null))
   }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala
index 430ffb29989eadbbf981b3a530861036e82afb9a..ebf7003ff9e574da2db0b80d855084c6666a0e50 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala
@@ -21,6 +21,7 @@ import scala.language.implicitConversions
 
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.{AbstractSparkSQLParser, SqlLexical}
+import org.apache.spark.sql.hive.execution.{AddJar, AddFile, HiveNativeCommand}
 
 /**
  * A parser that recognizes all HiveQL constructs together with Spark SQL specific extensions.
@@ -52,7 +53,7 @@ private[hive] class ExtendedHiveQlParser extends AbstractSparkSQLParser {
 
   protected lazy val dfs: Parser[LogicalPlan] =
     DFS ~> wholeInput ^^ {
-      case command => NativeCommand(command.trim)
+      case command => HiveNativeCommand(command.trim)
     }
 
   private lazy val addFile: Parser[LogicalPlan] =
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 7de440773023fd947edd4398b2468c7a6463b8f5..56fe27a77b8380715b120b3c56199d8898ebb675 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -39,8 +39,8 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperat
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.types.DecimalType
 import org.apache.spark.sql.catalyst.types.decimal.Decimal
-import org.apache.spark.sql.execution.{ExtractPythonUdfs, QueryExecutionException, Command => PhysicalCommand}
-import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand
+import org.apache.spark.sql.execution.{SparkPlan, ExecutedCommand, ExtractPythonUdfs, QueryExecutionException}
+import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTableCommand}
 import org.apache.spark.sql.sources.DataSourceStrategy
 
 /**
@@ -340,7 +340,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
 
     override def strategies: Seq[Strategy] = extraStrategies ++ Seq(
       DataSourceStrategy,
-      CommandStrategy(self),
+      CommandStrategy,
       HiveCommandStrategy(self),
       TakeOrdered,
       ParquetOperations,
@@ -369,11 +369,17 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
      * execution is simply passed back to Hive.
      */
     def stringResult(): Seq[String] = executedPlan match {
-      case describeHiveTableCommand: DescribeHiveTableCommand =>
+      case ExecutedCommand(desc: DescribeHiveTableCommand) =>
         // If it is a describe command for a Hive table, we want to have the output format
         // be similar with Hive.
-        describeHiveTableCommand.hiveString
-      case command: PhysicalCommand =>
+        desc.run(self).map {
+          case Row(name: String, dataType: String, comment) =>
+            Seq(name, dataType,
+              Option(comment.asInstanceOf[String]).getOrElse(""))
+              .map(s => String.format(s"%-20s", s))
+              .mkString("\t")
+        }
+      case command: ExecutedCommand =>
         command.executeCollect().map(_.head.toString)
 
       case other =>
@@ -386,7 +392,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
 
     override def simpleString: String =
       logical match {
-        case _: NativeCommand => "<Native command: executed by Hive>"
+        case _: HiveNativeCommand => "<Native command: executed by Hive>"
         case _: SetCommand => "<SET command: executed by Hive, and noted by SQLContext>"
         case _ => super.simpleString
       }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d8b10b78c6c595e56394734deb89c87fa1bb9fd9..b31a3ec25096b39eef33936d4276eba59da04b7c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.hive
 import java.io.IOException
 import java.util.{List => JList}
 
+import org.apache.spark.sql.execution.SparkPlan
+
 import scala.util.parsing.combinator.RegexParsers
 
 import org.apache.hadoop.util.ReflectionUtils
@@ -286,14 +288,24 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
           Some(sa.getQB().getTableDesc)
         }
 
-        CreateTableAsSelect(Some(databaseName), tblName, child, allowExisting, desc)
+        execution.CreateTableAsSelect(
+          databaseName,
+          tableName,
+          child,
+          allowExisting,
+          desc)
 
       case p: LogicalPlan if p.resolved => p
 
       case p @ CreateTableAsSelect(db, tableName, child, allowExisting, None) =>
         val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
         val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
-        CreateTableAsSelect(Some(databaseName), tblName, child, allowExisting, None)
+        execution.CreateTableAsSelect(
+          databaseName,
+          tableName,
+          child,
+          allowExisting,
+          None)
     }
   }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 5939276f6d65ee04c56153fc06f5e8dc35b76ecb..3f3d9e7cd4fbebac6ef370687c36a106081b828b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.types._
 import org.apache.spark.sql.catalyst.types.decimal.Decimal
+import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DropTable, AnalyzeTable}
 
 /* Implicit conversions */
 import scala.collection.JavaConversions._
@@ -44,14 +45,6 @@ import scala.collection.JavaConversions._
  */
 private[hive] case object NativePlaceholder extends Command
 
-private[hive] case class AddFile(filePath: String) extends Command
-
-private[hive] case class AddJar(path: String) extends Command
-
-private[hive] case class DropTable(tableName: String, ifExists: Boolean) extends Command
-
-private[hive] case class AnalyzeTable(tableName: String) extends Command
-
 /** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
 private[hive] object HiveQl {
   protected val nativeCommands = Seq(
@@ -239,10 +232,10 @@ private[hive] object HiveQl {
     try {
       val tree = getAst(sql)
       if (nativeCommands contains tree.getText) {
-        NativeCommand(sql)
+        HiveNativeCommand(sql)
       } else {
         nodeToPlan(tree) match {
-          case NativePlaceholder => NativeCommand(sql)
+          case NativePlaceholder => HiveNativeCommand(sql)
           case other => other
         }
       }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 4ebd59db835154433fe41b6baf1b6e7c5a042067..d3f6381b69a4d1433bca59d267cbee2066a16994 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -17,9 +17,6 @@
 
 package org.apache.spark.sql.hive
 
-import org.apache.hadoop.hive.ql.parse.ASTNode
-import org.apache.hadoop.hive.ql.plan.CreateTableDesc
-
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions._
@@ -28,7 +25,7 @@ import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.types.StringType
-import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan, PhysicalRDD}
+import org.apache.spark.sql.execution._
 import org.apache.spark.sql.hive
 import org.apache.spark.sql.hive.execution._
 import org.apache.spark.sql.parquet.ParquetRelation
@@ -177,25 +174,10 @@ private[hive] trait HiveStrategies {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
         execution.InsertIntoHiveTable(
-          table, partition, planLater(child), overwrite)(hiveContext) :: Nil
+          table, partition, planLater(child), overwrite) :: Nil
       case hive.InsertIntoHiveTable(table: MetastoreRelation, partition, child, overwrite) =>
         execution.InsertIntoHiveTable(
-          table, partition, planLater(child), overwrite)(hiveContext) :: Nil
-      case logical.CreateTableAsSelect(
-             Some(database), tableName, child, allowExisting, Some(desc: CreateTableDesc)) =>
-        execution.CreateTableAsSelect(
-          database,
-          tableName,
-          child,
-          allowExisting,
-          Some(desc)) :: Nil
-      case logical.CreateTableAsSelect(Some(database), tableName, child, allowExisting, None) =>
-        execution.CreateTableAsSelect(
-          database,
-          tableName,
-          child,
-          allowExisting,
-          None) :: Nil
+          table, partition, planLater(child), overwrite) :: Nil
       case _ => Nil
     }
   }
@@ -227,23 +209,14 @@ private[hive] trait HiveStrategies {
 
   case class HiveCommandStrategy(context: HiveContext) extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case logical.NativeCommand(sql) => NativeCommand(sql, plan.output)(context) :: Nil
-
-      case hive.DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil
-
-      case hive.AddJar(path) => execution.AddJar(path) :: Nil
-
-      case hive.AddFile(path) => execution.AddFile(path) :: Nil
-
-      case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil
-
       case describe: logical.DescribeCommand =>
         val resolvedTable = context.executePlan(describe.table).analyzed
         resolvedTable match {
           case t: MetastoreRelation =>
-            Seq(DescribeHiveTableCommand(t, describe.output, describe.isExtended)(context))
+            ExecutedCommand(
+              DescribeHiveTableCommand(t, describe.output, describe.isExtended)) :: Nil
           case o: LogicalPlan =>
-            Seq(DescribeCommand(planLater(o), describe.output)(context))
+            ExecutedCommand(DescribeCommand(planLater(o), describe.output)) :: Nil
         }
 
       case _ => Nil
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index eedb57de52ba90ae806b9e7951202ff1cbb833c4..b2149bd95a3366a231712bf88bae1ac287afd7dd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -20,9 +20,6 @@ package org.apache.spark.sql.hive.test
 import java.io.File
 import java.util.{Set => JavaSet}
 
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.ql.session.SessionState
-
 import scala.collection.mutable
 import scala.language.implicitConversions
 
@@ -37,10 +34,11 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerDe
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.util.Utils
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.plans.logical.{CacheTableCommand, LogicalPlan, NativeCommand}
+import org.apache.spark.sql.catalyst.plans.logical.{CacheTableCommand, LogicalPlan}
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.SQLConf
+import org.apache.spark.sql.hive.execution.HiveNativeCommand
 
 /* Implicit conversions */
 import scala.collection.JavaConversions._
@@ -161,7 +159,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
   abstract class QueryExecution extends super.QueryExecution {
     override lazy val analyzed = {
       val describedTables = logical match {
-        case NativeCommand(describedTable(tbl)) => tbl :: Nil
+        case HiveNativeCommand(describedTable(tbl)) => tbl :: Nil
         case CacheTableCommand(tbl, _, _) => tbl :: Nil
         case _ => Nil
       }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index b83689ceabb841cc115aebe4eb91eb4ee51a3a0c..fe21454e7fb383a98532a4c0e72d4d58b989c65f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -20,10 +20,10 @@ package org.apache.spark.sql.hive.execution
 import org.apache.hadoop.hive.ql.plan.CreateTableDesc
 
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
-import org.apache.spark.sql.execution.{SparkPlan, Command, LeafNode}
+import org.apache.spark.sql.execution.RunnableCommand
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.hive.MetastoreRelation
 
@@ -44,28 +44,23 @@ case class CreateTableAsSelect(
     tableName: String,
     query: LogicalPlan,
     allowExisting: Boolean,
-    desc: Option[CreateTableDesc]) extends LeafNode with Command {
+    desc: Option[CreateTableDesc]) extends RunnableCommand {
 
-  def output = Seq.empty
+  override def run(sqlContext: SQLContext) = {
+    val hiveContext = sqlContext.asInstanceOf[HiveContext]
+    lazy val metastoreRelation: MetastoreRelation = {
+      // Create Hive Table
+      hiveContext.catalog.createTable(database, tableName, query.output, allowExisting, desc)
 
-  private[this] def sc = sqlContext.asInstanceOf[HiveContext]
-
-  // A lazy computing of the metastoreRelation
-  private[this] lazy val metastoreRelation: MetastoreRelation = {
-    // Create Hive Table
-    sc.catalog.createTable(database, tableName, query.output, allowExisting, desc)
-
-    // Get the Metastore Relation
-    sc.catalog.lookupRelation(Some(database), tableName, None) match {
-      case r: MetastoreRelation => r
+      // Get the Metastore Relation
+      hiveContext.catalog.lookupRelation(Some(database), tableName, None) match {
+        case r: MetastoreRelation => r
+      }
     }
-  }
-
-  override protected[sql] lazy val sideEffectResult: Seq[Row] = {
     // TODO ideally, we should get the output data ready first and then
     // add the relation into catalog, just in case of failure occurs while data
     // processing.
-    if (sc.catalog.tableExists(Some(database), tableName)) {
+    if (hiveContext.catalog.tableExists(Some(database), tableName)) {
       if (allowExisting) {
         // table already exists, will do nothing, to keep consistent with Hive
       } else {
@@ -73,17 +68,12 @@ case class CreateTableAsSelect(
           new org.apache.hadoop.hive.metastore.api.AlreadyExistsException(s"$database.$tableName")
       }
     } else {
-      sc.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd
+      hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd
     }
 
     Seq.empty[Row]
   }
 
-  override def execute(): RDD[Row] = {
-    sideEffectResult
-    sparkContext.emptyRDD[Row]
-  }
-
   override def argString: String = {
     s"[Database:$database, TableName: $tableName, InsertIntoHiveTable]\n" + query.toString
   }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
index 5d98834c6fb33b97b740d5b45bd415926651322c..bfacc51ef57ab0d823257a6adc1351218f408761 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
@@ -22,11 +22,11 @@ import scala.collection.JavaConversions._
 import org.apache.hadoop.hive.metastore.api.FieldSchema
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
-import org.apache.spark.sql.execution.{Command, LeafNode}
+import org.apache.spark.sql.execution.{SparkPlan, RunnableCommand}
 import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation}
 import org.apache.spark.sql.hive.HiveShim
+import org.apache.spark.sql.SQLContext
 
 /**
  * Implementation for "describe [extended] table".
@@ -36,21 +36,10 @@ import org.apache.spark.sql.hive.HiveShim
 @DeveloperApi
 case class DescribeHiveTableCommand(
     table: MetastoreRelation,
-    output: Seq[Attribute],
-    isExtended: Boolean)(
-    @transient context: HiveContext)
-  extends LeafNode with Command {
+    override val output: Seq[Attribute],
+    isExtended: Boolean) extends RunnableCommand {
 
-  // Strings with the format like Hive. It is used for result comparison in our unit tests.
-  lazy val hiveString: Seq[String] = sideEffectResult.map {
-    case Row(name: String, dataType: String, comment) =>
-      Seq(name, dataType,
-        Option(comment.asInstanceOf[String]).getOrElse(""))
-        .map(s => String.format(s"%-20s", s))
-        .mkString("\t")
-  }
-
-  override protected lazy val sideEffectResult: Seq[Row] = {
+  override def run(sqlContext: SQLContext) = {
     // Trying to mimic the format of Hive's output. But not exactly the same.
     var results: Seq[(String, String, String)] = Nil
 
@@ -75,6 +64,4 @@ case class DescribeHiveTableCommand(
       Row(name, dataType, comment)
     }
   }
-
-  override def otherCopyArgs = context :: Nil
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
similarity index 64%
rename from sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala
rename to sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
index 6930c2babd117fdf759314c791945007fd2ff67a..8ba818af5f9d0f1df826386128d246d7f1b075e0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
@@ -1,38 +1,38 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow, Row}
-import org.apache.spark.sql.execution.{Command, LeafNode}
-import org.apache.spark.sql.hive.HiveContext
-
-/**
- * :: DeveloperApi ::
- */
-@DeveloperApi
-case class NativeCommand(
-    sql: String, output: Seq[Attribute])(
-    @transient context: HiveContext)
-  extends LeafNode with Command {
-
-  override protected lazy val sideEffectResult: Seq[Row] = context.runSqlHive(sql).map(Row(_))
-
-  override def otherCopyArgs = context :: Nil
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row}
+import org.apache.spark.sql.execution.RunnableCommand
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.types.StringType
+
+/**
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
+case class HiveNativeCommand(sql: String) extends RunnableCommand {
+
+  override def output =
+    Seq(AttributeReference("result", StringType, nullable = false)())
+
+  override def run(sqlContext: SQLContext) =
+    sqlContext.asInstanceOf[HiveContext].runSqlHive(sql).map(Row(_))
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 81390f626726c0bf3e520e220de7527bd7693e62..ca0ec1513917ff1611938ae79f1f6ab39f29d897 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -21,7 +21,6 @@ import java.util
 
 import scala.collection.JavaConversions._
 
-import org.apache.hadoop.hive.common.`type`.HiveVarchar
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hadoop.hive.metastore.MetaStoreUtils
@@ -31,14 +30,12 @@ import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
 import org.apache.hadoop.hive.serde2.Serializer
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
 import org.apache.hadoop.hive.serde2.objectinspector._
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.{JavaHiveDecimalObjectInspector, JavaHiveVarcharObjectInspector}
 import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.catalyst.types.decimal.Decimal
-import org.apache.spark.sql.execution.{Command, SparkPlan, UnaryNode}
+import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.{ ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.sql.hive.HiveShim._
@@ -52,10 +49,9 @@ case class InsertIntoHiveTable(
     table: MetastoreRelation,
     partition: Map[String, Option[String]],
     child: SparkPlan,
-    overwrite: Boolean)
-    (@transient sc: HiveContext)
-  extends UnaryNode with Command with HiveInspectors {
+    overwrite: Boolean) extends UnaryNode with HiveInspectors {
 
+  @transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
   @transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
   @transient private lazy val hiveContext = new Context(sc.hiveconf)
   @transient private lazy val db = Hive.get(sc.hiveconf)
@@ -66,8 +62,6 @@ case class InsertIntoHiveTable(
     serializer
   }
 
-  override def otherCopyArgs = sc :: Nil
-
   def output = child.output
 
   def saveAsHiveFile(
@@ -134,7 +128,7 @@ case class InsertIntoHiveTable(
    *
    * Note: this is run once and then kept to avoid double insertions.
    */
-  override protected[sql] lazy val sideEffectResult: Seq[Row] = {
+  protected[sql] lazy val sideEffectResult: Seq[Row] = {
     // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
     // instances within the closure, since Serializer is not serializable while TableDesc is.
     val tableDesc = table.tableDesc
@@ -256,4 +250,8 @@ case class InsertIntoHiveTable(
     // TODO: implement hive compatibility as rules.
     Seq.empty[Row]
   }
+
+  override def executeCollect(): Array[Row] = sideEffectResult.toArray
+
+  override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1)
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 903075edf7e04b0261c3faaed5b567af3f663557..6fc4153f6a5df713dd71c682f0449a1decc7d69a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -18,10 +18,10 @@
 package org.apache.spark.sql.hive.execution
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.execution.{Command, LeafNode}
+import org.apache.spark.sql.execution.RunnableCommand
 import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.SQLContext
 
 /**
  * :: DeveloperApi ::
@@ -32,13 +32,10 @@ import org.apache.spark.sql.hive.HiveContext
  * in the Hive metastore.
  */
 @DeveloperApi
-case class AnalyzeTable(tableName: String) extends LeafNode with Command {
-  def hiveContext = sqlContext.asInstanceOf[HiveContext]
+case class AnalyzeTable(tableName: String) extends RunnableCommand {
 
-  def output = Seq.empty
-
-  override protected lazy val sideEffectResult: Seq[Row] = {
-    hiveContext.analyze(tableName)
+  override def run(sqlContext: SQLContext) = {
+    sqlContext.asInstanceOf[HiveContext].analyze(tableName)
     Seq.empty[Row]
   }
 }
@@ -48,12 +45,12 @@ case class AnalyzeTable(tableName: String) extends LeafNode with Command {
  * Drops a table from the metastore and removes it if it is cached.
  */
 @DeveloperApi
-case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with Command {
-  def hiveContext = sqlContext.asInstanceOf[HiveContext]
-
-  def output = Seq.empty
+case class DropTable(
+    tableName: String,
+    ifExists: Boolean) extends RunnableCommand {
 
-  override protected lazy val sideEffectResult: Seq[Row] = {
+  override def run(sqlContext: SQLContext) = {
+    val hiveContext = sqlContext.asInstanceOf[HiveContext]
     val ifExistsClause = if (ifExists) "IF EXISTS " else ""
     hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
     hiveContext.catalog.unregisterTable(None, tableName)
@@ -65,12 +62,10 @@ case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with
  * :: DeveloperApi ::
  */
 @DeveloperApi
-case class AddJar(path: String) extends LeafNode with Command {
-  def hiveContext = sqlContext.asInstanceOf[HiveContext]
+case class AddJar(path: String) extends RunnableCommand {
 
-  override def output = Seq.empty
-
-  override protected lazy val sideEffectResult: Seq[Row] = {
+  override def run(sqlContext: SQLContext) = {
+    val hiveContext = sqlContext.asInstanceOf[HiveContext]
     hiveContext.runSqlHive(s"ADD JAR $path")
     hiveContext.sparkContext.addJar(path)
     Seq.empty[Row]
@@ -81,12 +76,10 @@ case class AddJar(path: String) extends LeafNode with Command {
  * :: DeveloperApi ::
  */
 @DeveloperApi
-case class AddFile(path: String) extends LeafNode with Command {
-  def hiveContext = sqlContext.asInstanceOf[HiveContext]
-
-  override def output = Seq.empty
+case class AddFile(path: String) extends RunnableCommand {
 
-  override protected lazy val sideEffectResult: Seq[Row] = {
+  override def run(sqlContext: SQLContext) = {
+    val hiveContext = sqlContext.asInstanceOf[HiveContext]
     hiveContext.runSqlHive(s"ADD FILE $path")
     hiveContext.sparkContext.addFile(path)
     Seq.empty[Row]
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index a90fc023e67d8843dc517e3acf0aff0444060c77..ff4071d8e2f109448bc18c4d2f15a0bbd9630018 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -22,10 +22,10 @@ import org.scalatest.BeforeAndAfterAll
 import scala.reflect.ClassTag
 
 import org.apache.spark.sql.{SQLConf, QueryTest}
-import org.apache.spark.sql.catalyst.plans.logical.NativeCommand
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, ShuffledHashJoin}
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
+import org.apache.spark.sql.hive.execution._
 
 class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
   TestHive.reset()
@@ -51,19 +51,19 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
 
     assertAnalyzeCommand(
       "ANALYZE TABLE Table1 COMPUTE STATISTICS",
-      classOf[NativeCommand])
+      classOf[HiveNativeCommand])
     assertAnalyzeCommand(
       "ANALYZE TABLE Table1 PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS",
-      classOf[NativeCommand])
+      classOf[HiveNativeCommand])
     assertAnalyzeCommand(
       "ANALYZE TABLE Table1 PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS noscan",
-      classOf[NativeCommand])
+      classOf[HiveNativeCommand])
     assertAnalyzeCommand(
       "ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS",
-      classOf[NativeCommand])
+      classOf[HiveNativeCommand])
     assertAnalyzeCommand(
       "ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS noscan",
-      classOf[NativeCommand])
+      classOf[HiveNativeCommand])
 
     assertAnalyzeCommand(
       "ANALYZE TABLE Table1 COMPUTE STATISTICS nOscAn",
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 44eb4cfa593357187c3895d8238e6430a5167e42..8011f9b8773b3c5650c0aff08a291e7fba3eeb0d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -24,7 +24,6 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen}
 import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.plans.logical.{NativeCommand => LogicalNativeCommand}
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.hive.test.TestHive
 
@@ -142,14 +141,14 @@ abstract class HiveComparisonTest
       // Hack: Hive simply prints the result of a SET command to screen,
       // and does not return it as a query answer.
       case _: SetCommand => Seq("0")
-      case LogicalNativeCommand(c) if c.toLowerCase.contains("desc") =>
+      case HiveNativeCommand(c) if c.toLowerCase.contains("desc") =>
         answer
           .filterNot(nonDeterministicLine)
           .map(_.replaceAll("from deserializer", ""))
           .map(_.replaceAll("None", ""))
           .map(_.trim)
           .filterNot(_ == "")
-      case _: LogicalNativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "")
+      case _: HiveNativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "")
       case _: ExplainCommand => answer
       case _: DescribeCommand =>
         // Filter out non-deterministic lines and lines which do not have actual results but