From ac96d9657c9a9f89a455a1b671c059d896012d41 Mon Sep 17 00:00:00 2001
From: Cheng Lian <lian.cs.zju@gmail.com>
Date: Fri, 13 Jun 2014 12:59:48 -0700
Subject: [PATCH] [SPARK-2094][SQL] "Exactly once" semantics for DDL and
 command statements

## Related JIRA issues

- Main issue:

  - [SPARK-2094](https://issues.apache.org/jira/browse/SPARK-2094): Ensure exactly once semantics for DDL/Commands

- Issues resolved as dependencies:

  - [SPARK-2081](https://issues.apache.org/jira/browse/SPARK-2081): Undefine output() from the abstract class Command and implement it in concrete subclasses
  - [SPARK-2128](https://issues.apache.org/jira/browse/SPARK-2128): No plan for DESCRIBE
  - [SPARK-1852](https://issues.apache.org/jira/browse/SPARK-1852): SparkSQL Queries with Sorts run before the user asks them to

- Other related issue:

  - [SPARK-2129](https://issues.apache.org/jira/browse/SPARK-2129): NPE thrown while lookup a view

    Two test cases, `join_view` and `mergejoin_mixed`, within the `HiveCompatibilitySuite` are removed from the whitelist to workaround this issue.

## PR Overview

This PR defines physical plans for DDL statements and commands and wraps their side effects in a lazy field `PhysicalCommand.sideEffectResult`, so that they are executed eagerly and exactly once.  Also, as a positive side effect, now DDL statements and commands can be turned into proper `SchemaRDD`s and let user query the execution results.

This PR defines schemas for the following DDL/commands:

- EXPLAIN command

  - `plan`: String, the plan explanation

- SET command

  - `key`: String, the key(s) of the propert(y/ies) being set or queried
  - `value`: String, the value(s) of the propert(y/ies) being queried

- Other Hive native command

  - `result`: String, execution result returned by Hive

  **NOTE**: We should refine schemas for different native commands by defining physical plans for them in the future.

## Examples

### EXPLAIN command

Take the "EXPLAIN" command as an example, we first execute the command and obtain a `SchemaRDD` at the same time, then query the `plan` field with the schema DSL:

```
scala> loadTestTable("src")
...

scala> val q0 = hql("EXPLAIN SELECT key, COUNT(*) FROM src GROUP BY key")
...
q0: org.apache.spark.sql.SchemaRDD =
SchemaRDD[0] at RDD at SchemaRDD.scala:98
== Query Plan ==
ExplainCommandPhysical [plan#11:0]
 Aggregate false, [key#4], [key#4,SUM(PartialCount#6L) AS c_1#2L]
  Exchange (HashPartitioning [key#4:0], 200)
   Exchange (HashPartitioning [key#4:0], 200)
    Aggregate true, [key#4], [key#4,COUNT(1) AS PartialCount#6L]
     HiveTableScan [key#4], (MetastoreRelation default, src, None), None

scala> q0.select('plan).collect()
...
[ExplainCommandPhysical [plan#24:0]
 Aggregate false, [key#17], [key#17,SUM(PartialCount#19L) AS c_1#2L]
  Exchange (HashPartitioning [key#17:0], 200)
   Exchange (HashPartitioning [key#17:0], 200)
    Aggregate true, [key#17], [key#17,COUNT(1) AS PartialCount#19L]
     HiveTableScan [key#17], (MetastoreRelation default, src, None), None]

scala>
```

### SET command

In this example we query all the properties set in `SQLConf`, register the result as a table, and then query the table with HiveQL:

```
scala> val q1 = hql("SET")
...
q1: org.apache.spark.sql.SchemaRDD =
SchemaRDD[7] at RDD at SchemaRDD.scala:98
== Query Plan ==
<SET command: executed by Hive, and noted by SQLContext>

scala> q1.registerAsTable("properties")

scala> hql("SELECT key, value FROM properties ORDER BY key LIMIT 10").foreach(println)
...
== Query Plan ==
TakeOrdered 10, [key#51:0 ASC]
 Project [key#51:0,value#52:1]
  SetCommandPhysical None, None, [key#55:0,value#56:1]), which has no missing parents
14/06/12 12:19:27 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 5 (SchemaRDD[21] at RDD at SchemaRDD.scala:98
== Query Plan ==
TakeOrdered 10, [key#51:0 ASC]
 Project [key#51:0,value#52:1]
  SetCommandPhysical None, None, [key#55:0,value#56:1])
...
[datanucleus.autoCreateSchema,true]
[datanucleus.autoStartMechanismMode,checked]
[datanucleus.cache.level2,false]
[datanucleus.cache.level2.type,none]
[datanucleus.connectionPoolingType,BONECP]
[datanucleus.fixedDatastore,false]
[datanucleus.identifierFactory,datanucleus1]
[datanucleus.plugin.pluginRegistryBundleCheck,LOG]
[datanucleus.rdbms.useLegacyNativeValueStrategy,true]
[datanucleus.storeManagerType,rdbms]

scala>
```

### "Exactly once" semantics

At last, an example of the "exactly once" semantics:

```
scala> val q2 = hql("CREATE TABLE t1(key INT, value STRING)")
...
q2: org.apache.spark.sql.SchemaRDD =
SchemaRDD[28] at RDD at SchemaRDD.scala:98
== Query Plan ==
<Native command: executed by Hive>

scala> table("t1")
...
res9: org.apache.spark.sql.SchemaRDD =
SchemaRDD[32] at RDD at SchemaRDD.scala:98
== Query Plan ==
HiveTableScan [key#58,value#59], (MetastoreRelation default, t1, None), None

scala> q2.collect()
...
res10: Array[org.apache.spark.sql.Row] = Array([])

scala>
```

As we can see, the "CREATE TABLE" command is executed eagerly right after the `SchemaRDD` is created, and referencing the `SchemaRDD` again won't trigger a duplicated execution.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #1071 from liancheng/exactlyOnceCommand and squashes the following commits:

d005b03 [Cheng Lian] Made "SET key=value" returns the newly set key value pair
f6c7715 [Cheng Lian] Added test cases for DDL/command statement RDDs
1d00937 [Cheng Lian] Makes SchemaRDD DSLs work for DDL/command statement RDDs
5c7e680 [Cheng Lian] Bug fix: wrong type used in pattern matching
48aa2e5 [Cheng Lian] Refined SQLContext.emptyResult as an empty RDD[Row]
cc64f32 [Cheng Lian] Renamed physical plan classes for DDL/commands
74789c1 [Cheng Lian] Fixed failing test cases
0ad343a [Cheng Lian] Added physical plan for DDL and commands to ensure the "exactly once" semantics
---
 .../sql/catalyst/plans/logical/commands.scala |  18 +--
 .../optimizer/FilterPushdownSuite.scala       |   2 +-
 .../org/apache/spark/sql/SQLContext.scala     |  39 +-----
 .../org/apache/spark/sql/SchemaRDD.scala      |   2 +-
 .../org/apache/spark/sql/SchemaRDDLike.scala  |  15 ++-
 .../spark/sql/api/java/JavaSchemaRDD.scala    |   2 +-
 .../spark/sql/execution/SparkStrategies.scala |  11 +-
 .../apache/spark/sql/execution/commands.scala |  81 ++++++++----
 .../org/apache/spark/sql/SQLQuerySuite.scala  |  16 +--
 .../apache/spark/sql/hive/HiveContext.scala   |  55 +++-----
 .../spark/sql/hive/HiveStrategies.scala       |   8 ++
 .../sql/hive/execution/hiveOperators.scala    |  32 ++++-
 .../hive/execution/HiveComparisonTest.scala   |   3 +-
 .../execution/HiveCompatibilitySuite.scala    |   9 +-
 .../sql/hive/execution/HiveQuerySuite.scala   | 125 +++++++++++++-----
 15 files changed, 251 insertions(+), 167 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
index d05c965275..3299e86b85 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference}
 import org.apache.spark.sql.catalyst.types.StringType
 
 /**
@@ -26,23 +26,25 @@ import org.apache.spark.sql.catalyst.types.StringType
  */
 abstract class Command extends LeafNode {
   self: Product =>
-  def output: Seq[Attribute] = Seq.empty  // TODO: SPARK-2081 should fix this
+  def output: Seq[Attribute] = Seq.empty
 }
 
 /**
  * Returned for commands supported by a given parser, but not catalyst.  In general these are DDL
  * commands that are passed directly to another system.
  */
-case class NativeCommand(cmd: String) extends Command
+case class NativeCommand(cmd: String) extends Command {
+  override def output =
+    Seq(BoundReference(0, AttributeReference("result", StringType, nullable = false)()))
+}
 
 /**
  * Commands of the form "SET (key) (= value)".
  */
 case class SetCommand(key: Option[String], value: Option[String]) extends Command {
   override def output = Seq(
-    AttributeReference("key", StringType, nullable = false)(),
-    AttributeReference("value", StringType, nullable = false)()
-  )
+    BoundReference(0, AttributeReference("key", StringType, nullable = false)()),
+    BoundReference(1, AttributeReference("value", StringType, nullable = false)()))
 }
 
 /**
@@ -50,11 +52,11 @@ case class SetCommand(key: Option[String], value: Option[String]) extends Comman
  * actually performing the execution.
  */
 case class ExplainCommand(plan: LogicalPlan) extends Command {
-  override def output = Seq(AttributeReference("plan", StringType, nullable = false)())
+  override def output =
+    Seq(BoundReference(0, AttributeReference("plan", StringType, nullable = false)()))
 }
 
 /**
  * Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command.
  */
 case class CacheCommand(tableName: String, doCache: Boolean) extends Command
-
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 0cada785b6..1f67c80e54 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -161,7 +161,7 @@ class FilterPushdownSuite extends OptimizerTest {
 
     comparePlans(optimized, correctAnswer)
   }
-  
+
   test("joins: push down left outer join #1") {
     val x = testRelation.subquery('x)
     val y = testRelation.subquery('y)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index b6a2f1b9d1..378ff54531 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.{ScalaReflection, dsl}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.types._
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.plans.logical.{SetCommand, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 
 import org.apache.spark.sql.columnar.InMemoryRelation
@@ -147,14 +147,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    *
    * @group userf
    */
-  def sql(sqlText: String): SchemaRDD = {
-    val result = new SchemaRDD(this, parseSql(sqlText))
-    // We force query optimization to happen right away instead of letting it happen lazily like
-    // when using the query DSL.  This is so DDL commands behave as expected.  This is only
-    // generates the RDD lineage for DML queries, but do not perform any execution.
-    result.queryExecution.toRdd
-    result
-  }
+  def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText))
 
   /** Returns the specified table as a SchemaRDD */
   def table(tableName: String): SchemaRDD =
@@ -259,8 +252,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
   protected[sql] val planner = new SparkPlanner
 
   @transient
-  protected[sql] lazy val emptyResult =
-    sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1)
+  protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[Row], 1)
 
   /**
    * Prepares a planned SparkPlan for execution by binding references to specific ordinals, and
@@ -280,22 +272,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
   protected abstract class QueryExecution {
     def logical: LogicalPlan
 
-    def eagerlyProcess(plan: LogicalPlan): RDD[Row] = plan match {
-      case SetCommand(key, value) =>
-        // Only this case needs to be executed eagerly. The other cases will
-        // be taken care of when the actual results are being extracted.
-        // In the case of HiveContext, sqlConf is overridden to also pass the
-        // pair into its HiveConf.
-        if (key.isDefined && value.isDefined) {
-          set(key.get, value.get)
-        }
-        // It doesn't matter what we return here, since this is only used
-        // to force the evaluation to happen eagerly.  To query the results,
-        // one must use SchemaRDD operations to extract them.
-        emptyResult
-      case _ => executedPlan.execute()
-    }
-
     lazy val analyzed = analyzer(logical)
     lazy val optimizedPlan = optimizer(analyzed)
     // TODO: Don't just pick the first one...
@@ -303,12 +279,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
     lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
 
     /** Internal version of the RDD. Avoids copies and has no schema */
-    lazy val toRdd: RDD[Row] = {
-      logical match {
-        case s: SetCommand => eagerlyProcess(s)
-        case _ => executedPlan.execute()
-      }
-    }
+    lazy val toRdd: RDD[Row] = executedPlan.execute()
 
     protected def stringOrError[A](f: => A): String =
       try f.toString catch { case e: Throwable => e.toString }
@@ -330,7 +301,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * TODO: We only support primitive types, add support for nested types.
    */
   private[sql] def inferSchema(rdd: RDD[Map[String, _]]): SchemaRDD = {
-    val schema = rdd.first.map { case (fieldName, obj) =>
+    val schema = rdd.first().map { case (fieldName, obj) =>
       val dataType = obj.getClass match {
         case c: Class[_] if c == classOf[java.lang.String] => StringType
         case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 7ad8edf5a5..821ac850ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -97,7 +97,7 @@ import java.util.{Map => JMap}
 @AlphaComponent
 class SchemaRDD(
     @transient val sqlContext: SQLContext,
-    @transient protected[spark] val logicalPlan: LogicalPlan)
+    @transient val baseLogicalPlan: LogicalPlan)
   extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike {
 
   def baseSchemaRDD = this
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
index 3a895e15a4..656be965a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
@@ -20,13 +20,14 @@ package org.apache.spark.sql
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.SparkLogicalPlan
 
 /**
  * Contains functions that are shared between all SchemaRDD types (i.e., Scala, Java)
  */
 private[sql] trait SchemaRDDLike {
   @transient val sqlContext: SQLContext
-  @transient protected[spark] val logicalPlan: LogicalPlan
+  @transient val baseLogicalPlan: LogicalPlan
 
   private[sql] def baseSchemaRDD: SchemaRDD
 
@@ -48,7 +49,17 @@ private[sql] trait SchemaRDDLike {
    */
   @transient
   @DeveloperApi
-  lazy val queryExecution = sqlContext.executePlan(logicalPlan)
+  lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)
+
+  @transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan match {
+    // For various commands (like DDL) and queries with side effects, we force query optimization to
+    // happen right away to let these side effects take place eagerly.
+    case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile =>
+      queryExecution.toRdd
+      SparkLogicalPlan(queryExecution.executedPlan)
+    case _ =>
+      baseLogicalPlan
+  }
 
   override def toString =
     s"""${super.toString}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
index 22f57b758d..aff6ffe9f3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
@@ -37,7 +37,7 @@ import org.apache.spark.storage.StorageLevel
  */
 class JavaSchemaRDD(
      @transient val sqlContext: SQLContext,
-     @transient protected[spark] val logicalPlan: LogicalPlan)
+     @transient val baseLogicalPlan: LogicalPlan)
   extends JavaRDDLike[Row, JavaRDD[Row]]
   with SchemaRDDLike {
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index a657911afe..2233216a6e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.sql.{SQLConf, SQLContext, execution}
+import org.apache.spark.sql.{SQLContext, execution}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
@@ -157,7 +157,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         InsertIntoParquetTable(relation, planLater(child), overwrite=true)(sparkContext) :: Nil
       case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
         InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil
-      case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => {
+      case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
         val prunePushedDownFilters =
           if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {
             (filters: Seq[Expression]) => {
@@ -186,7 +186,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
           filters,
           prunePushedDownFilters,
           ParquetTableScan(_, relation, filters)(sparkContext)) :: Nil
-      }
 
       case _ => Nil
     }
@@ -250,12 +249,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
   case class CommandStrategy(context: SQLContext) extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case logical.SetCommand(key, value) =>
-        Seq(execution.SetCommandPhysical(key, value, plan.output)(context))
+        Seq(execution.SetCommand(key, value, plan.output)(context))
       case logical.ExplainCommand(child) =>
         val executedPlan = context.executePlan(child).executedPlan
-        Seq(execution.ExplainCommandPhysical(executedPlan, plan.output)(context))
+        Seq(execution.ExplainCommand(executedPlan, plan.output)(context))
       case logical.CacheCommand(tableName, cache) =>
-        Seq(execution.CacheCommandPhysical(tableName, cache)(context))
+        Seq(execution.CacheCommand(tableName, cache)(context))
       case _ => Nil
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index be26d19e66..0377290af5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -22,45 +22,69 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{SQLContext, Row}
 import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute}
 
+trait Command {
+  /**
+   * A concrete command should override this lazy field to wrap up any side effects caused by the
+   * command or any other computation that should be evaluated exactly once. The value of this field
+   * can be used as the contents of the corresponding RDD generated from the physical plan of this
+   * command.
+   *
+   * The `execute()` method of all the physical command classes should reference `sideEffectResult`
+   * so that the command can be executed eagerly right after the command query is created.
+   */
+  protected[sql] lazy val sideEffectResult: Seq[Any] = Seq.empty[Any]
+}
+
 /**
  * :: DeveloperApi ::
  */
 @DeveloperApi
-case class SetCommandPhysical(key: Option[String], value: Option[String], output: Seq[Attribute])
-                             (@transient context: SQLContext) extends LeafNode {
-  def execute(): RDD[Row] = (key, value) match {
-    // Set value for key k; the action itself would
-    // have been performed in QueryExecution eagerly.
-    case (Some(k), Some(v)) => context.emptyResult
+case class SetCommand(
+    key: Option[String], value: Option[String], output: Seq[Attribute])(
+    @transient context: SQLContext)
+  extends LeafNode with Command {
+
+  override protected[sql] lazy val sideEffectResult: Seq[(String, String)] = (key, value) match {
+    // Set value for key k.
+    case (Some(k), Some(v)) =>
+      context.set(k, v)
+      Array(k -> v)
+
     // Query the value bound to key k.
-    case (Some(k), None) =>
-      val resultString = context.getOption(k) match {
-        case Some(v) => s"$k=$v"
-        case None => s"$k is undefined"
-      }
-      context.sparkContext.parallelize(Seq(new GenericRow(Array[Any](resultString))), 1)
+    case (Some(k), _) =>
+      Array(k -> context.getOption(k).getOrElse("<undefined>"))
+
     // Query all key-value pairs that are set in the SQLConf of the context.
     case (None, None) =>
-      val pairs = context.getAll
-      val rows = pairs.map { case (k, v) =>
-        new GenericRow(Array[Any](s"$k=$v"))
-      }.toSeq
-      // Assume config parameters can fit into one split (machine) ;)
-      context.sparkContext.parallelize(rows, 1)
-    // The only other case is invalid semantics and is impossible.
-    case _ => context.emptyResult
+      context.getAll
+
+    case _ =>
+      throw new IllegalArgumentException()
   }
+
+  def execute(): RDD[Row] = {
+    val rows = sideEffectResult.map { case (k, v) => new GenericRow(Array[Any](k, v)) }
+    context.sparkContext.parallelize(rows, 1)
+  }
+
+  override def otherCopyArgs = context :: Nil
 }
 
 /**
  * :: DeveloperApi ::
  */
 @DeveloperApi
-case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute])
-                                 (@transient context: SQLContext) extends UnaryNode {
+case class ExplainCommand(
+    child: SparkPlan, output: Seq[Attribute])(
+    @transient context: SQLContext)
+  extends UnaryNode with Command {
+
+  // Actually "EXPLAIN" command doesn't cause any side effect.
+  override protected[sql] lazy val sideEffectResult: Seq[String] = this.toString.split("\n")
+
   def execute(): RDD[Row] = {
-    val planString = new GenericRow(Array[Any](child.toString))
-    context.sparkContext.parallelize(Seq(planString))
+    val explanation = sideEffectResult.mkString("\n")
+    context.sparkContext.parallelize(Seq(new GenericRow(Array[Any](explanation))), 1)
   }
 
   override def otherCopyArgs = context :: Nil
@@ -70,19 +94,20 @@ case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute])
  * :: DeveloperApi ::
  */
 @DeveloperApi
-case class CacheCommandPhysical(tableName: String, doCache: Boolean)(@transient context: SQLContext)
-  extends LeafNode {
+case class CacheCommand(tableName: String, doCache: Boolean)(@transient context: SQLContext)
+  extends LeafNode with Command {
 
-  lazy val commandSideEffect = {
+  override protected[sql] lazy val sideEffectResult = {
     if (doCache) {
       context.cacheTable(tableName)
     } else {
       context.uncacheTable(tableName)
     }
+    Seq.empty[Any]
   }
 
   override def execute(): RDD[Row] = {
-    commandSideEffect
+    sideEffectResult
     context.emptyResult
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index c1fc99f077..e9360b0fc7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -141,7 +141,7 @@ class SQLQuerySuite extends QueryTest {
       sql("SELECT AVG(a),b FROM largeAndSmallInts group by b"),
       Seq((2147483645.0,1),(2.0,2)))
   }
-  
+
   test("count") {
     checkAnswer(
       sql("SELECT COUNT(*) FROM testData2"),
@@ -332,7 +332,7 @@ class SQLQuerySuite extends QueryTest {
         (3, "C"),
         (4, "D")))
   }
-  
+
   test("system function upper()") {
     checkAnswer(
       sql("SELECT n,UPPER(l) FROM lowerCaseData"),
@@ -349,7 +349,7 @@ class SQLQuerySuite extends QueryTest {
         (2, "ABC"),
         (3, null)))
   }
-    
+
   test("system function lower()") {
     checkAnswer(
       sql("SELECT N,LOWER(L) FROM upperCaseData"),
@@ -382,25 +382,25 @@ class SQLQuerySuite extends QueryTest {
     sql(s"SET $testKey=$testVal")
     checkAnswer(
       sql("SET"),
-      Seq(Seq(s"$testKey=$testVal"))
+      Seq(Seq(testKey, testVal))
     )
 
     sql(s"SET ${testKey + testKey}=${testVal + testVal}")
     checkAnswer(
       sql("set"),
       Seq(
-        Seq(s"$testKey=$testVal"),
-        Seq(s"${testKey + testKey}=${testVal + testVal}"))
+        Seq(testKey, testVal),
+        Seq(testKey + testKey, testVal + testVal))
     )
 
     // "set key"
     checkAnswer(
       sql(s"SET $testKey"),
-      Seq(Seq(s"$testKey=$testVal"))
+      Seq(Seq(testKey, testVal))
     )
     checkAnswer(
       sql(s"SET $nonexistentKey"),
-      Seq(Seq(s"$nonexistentKey is undefined"))
+      Seq(Seq(nonexistentKey, "<undefined>"))
     )
     clear()
   }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 9cd13f6ae0..96e0ec5136 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -15,8 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql
-package hive
+package org.apache.spark.sql.hive
 
 import java.io.{BufferedReader, File, InputStreamReader, PrintStream}
 import java.util.{ArrayList => JArrayList}
@@ -32,12 +31,13 @@ import org.apache.hadoop.hive.ql.session.SessionState
 
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog}
-import org.apache.spark.sql.catalyst.expressions.GenericRow
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.types._
-import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.QueryExecutionException
+import org.apache.spark.sql.execution.{Command => PhysicalCommand}
 
 /**
  * Starts up an instance of hive where metadata is stored locally. An in-process metadata data is
@@ -71,14 +71,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
   /**
    * Executes a query expressed in HiveQL using Spark, returning the result as a SchemaRDD.
    */
-  def hiveql(hqlQuery: String): SchemaRDD = {
-    val result = new SchemaRDD(this, HiveQl.parseSql(hqlQuery))
-    // We force query optimization to happen right away instead of letting it happen lazily like
-    // when using the query DSL.  This is so DDL commands behave as expected.  This is only
-    // generates the RDD lineage for DML queries, but does not perform any execution.
-    result.queryExecution.toRdd
-    result
-  }
+  def hiveql(hqlQuery: String): SchemaRDD = new SchemaRDD(this, HiveQl.parseSql(hqlQuery))
 
   /** An alias for `hiveql`. */
   def hql(hqlQuery: String): SchemaRDD = hiveql(hqlQuery)
@@ -164,7 +157,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
   /**
    * Runs the specified SQL query using Hive.
    */
-  protected def runSqlHive(sql: String): Seq[String] = {
+  protected[sql] def runSqlHive(sql: String): Seq[String] = {
     val maxResults = 100000
     val results = runHive(sql, 100000)
     // It is very confusing when you only get back some of the results...
@@ -228,6 +221,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
 
     override val strategies: Seq[Strategy] = Seq(
       CommandStrategy(self),
+      HiveCommandStrategy(self),
       TakeOrdered,
       ParquetOperations,
       InMemoryScans,
@@ -252,25 +246,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
     override lazy val optimizedPlan =
       optimizer(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))
 
-    override lazy val toRdd: RDD[Row] = {
-      def processCmd(cmd: String): RDD[Row] = {
-        val output = runSqlHive(cmd)
-        if (output.size == 0) {
-          emptyResult
-        } else {
-          val asRows = output.map(r => new GenericRow(r.split("\t").asInstanceOf[Array[Any]]))
-          sparkContext.parallelize(asRows, 1)
-        }
-      }
-
-      logical match {
-        case s: SetCommand => eagerlyProcess(s)
-        case _ => analyzed match {
-          case NativeCommand(cmd) => processCmd(cmd)
-          case _ => executedPlan.execute().map(_.copy())
-        }
-      }
-    }
+    override lazy val toRdd: RDD[Row] = executedPlan.execute().map(_.copy())
 
     protected val primitiveTypes =
       Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType,
@@ -298,7 +274,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
         struct.zip(fields).map {
           case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}"""
         }.mkString("{", ",", "}")
-      case (seq: Seq[_], ArrayType(typ))=>
+      case (seq: Seq[_], ArrayType(typ)) =>
         seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
       case (map: Map[_,_], MapType(kType, vType)) =>
         map.map {
@@ -314,10 +290,11 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
      * Returns the result as a hive compatible sequence of strings.  For native commands, the
      * execution is simply passed back to Hive.
      */
-    def stringResult(): Seq[String] = analyzed match {
-      case NativeCommand(cmd) => runSqlHive(cmd)
-      case ExplainCommand(plan) => executePlan(plan).toString.split("\n")
-      case query =>
+    def stringResult(): Seq[String] = executedPlan match {
+      case command: PhysicalCommand =>
+        command.sideEffectResult.map(_.toString)
+
+      case other =>
         val result: Seq[Seq[Any]] = toRdd.collect().toSeq
         // We need the types so we can output struct field names
         val types = analyzed.output.map(_.dataType)
@@ -328,8 +305,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
 
     override def simpleString: String =
       logical match {
-        case _: NativeCommand => "<Executed by Hive>"
-        case _: SetCommand => "<Set Command: Executed by Hive, and noted by SQLContext>"
+        case _: NativeCommand => "<Native command: executed by Hive>"
+        case _: SetCommand => "<SET command: executed by Hive, and noted by SQLContext>"
         case _ => executedPlan.toString
       }
   }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index ed6cd5a11d..0ac0ee9071 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -76,4 +76,12 @@ private[hive] trait HiveStrategies {
         Nil
     }
   }
+
+  case class HiveCommandStrategy(context: HiveContext) extends Strategy {
+    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+      case logical.NativeCommand(sql) =>
+        NativeCommand(sql, plan.output)(context) :: Nil
+      case _ => Nil
+    }
+  }
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala
index 29b4b9b006..a839231449 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala
@@ -32,14 +32,15 @@ import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Serializer}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapred._
 
+import org.apache.spark
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.types.{BooleanType, DataType}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.hive._
-import org.apache.spark.{TaskContext, SparkException}
 import org.apache.spark.util.MutablePair
+import org.apache.spark.{TaskContext, SparkException}
 
 /* Implicits */
 import scala.collection.JavaConversions._
@@ -57,7 +58,7 @@ case class HiveTableScan(
     attributes: Seq[Attribute],
     relation: MetastoreRelation,
     partitionPruningPred: Option[Expression])(
-    @transient val sc: HiveContext)
+    @transient val context: HiveContext)
   extends LeafNode
   with HiveInspectors {
 
@@ -75,7 +76,7 @@ case class HiveTableScan(
   }
 
   @transient
-  val hadoopReader = new HadoopTableReader(relation.tableDesc, sc)
+  val hadoopReader = new HadoopTableReader(relation.tableDesc, context)
 
   /**
    * The hive object inspector for this table, which can be used to extract values from the
@@ -156,7 +157,7 @@ case class HiveTableScan(
     hiveConf.set(serdeConstants.LIST_COLUMNS, columnInternalNames)
   }
 
-  addColumnMetadataToConf(sc.hiveconf)
+  addColumnMetadataToConf(context.hiveconf)
 
   @transient
   def inputRdd = if (!relation.hiveQlTable.isPartitioned) {
@@ -428,3 +429,26 @@ case class InsertIntoHiveTable(
     sc.sparkContext.makeRDD(Nil, 1)
   }
 }
+
+/**
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
+case class NativeCommand(
+    sql: String, output: Seq[Attribute])(
+    @transient context: HiveContext)
+  extends LeafNode with Command {
+
+  override protected[sql] lazy val sideEffectResult: Seq[String] = context.runSqlHive(sql)
+
+  override def execute(): RDD[spark.sql.Row] = {
+    if (sideEffectResult.size == 0) {
+      context.emptyResult
+    } else {
+      val rows = sideEffectResult.map(r => new GenericRow(Array[Any](r)))
+      context.sparkContext.parallelize(rows, 1)
+    }
+  }
+
+  override def otherCopyArgs = context :: Nil
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 357c7e654b..24c929ff74 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -24,6 +24,7 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen}
 import org.apache.spark.sql.Logging
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.logical.{NativeCommand => LogicalNativeCommand}
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.hive.test.TestHive
 
@@ -141,7 +142,7 @@ abstract class HiveComparisonTest
       // Hack: Hive simply prints the result of a SET command to screen,
       // and does not return it as a query answer.
       case _: SetCommand => Seq("0")
-      case _: NativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "")
+      case _: LogicalNativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "")
       case _: ExplainCommand => answer
       case plan => if (isSorted(plan)) answer else answer.sorted
     }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 3581617c26..ee194dbcb7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -172,7 +172,12 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "case_sensitivity",
 
     // Flaky test, Hive sometimes returns different set of 10 rows.
-    "lateral_view_outer"
+    "lateral_view_outer",
+
+    // After stop taking the `stringOrError` route, exceptions are thrown from these cases.
+    // See SPARK-2129 for details.
+    "join_view",
+    "mergejoins_mixed"
   )
 
   /**
@@ -476,7 +481,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "join_reorder3",
     "join_reorder4",
     "join_star",
-    "join_view",
     "lateral_view",
     "lateral_view_cp",
     "lateral_view_ppd",
@@ -507,7 +511,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "merge1",
     "merge2",
     "mergejoins",
-    "mergejoins_mixed",
     "multigroupby_singlemr",
     "multi_insert_gby",
     "multi_insert_gby3",
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 6c239b02ed..0d656c5569 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -17,9 +17,11 @@
 
 package org.apache.spark.sql.hive.execution
 
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.hive.test.TestHive._
+import scala.util.Try
+
 import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.hive.test.TestHive._
+import org.apache.spark.sql.{SchemaRDD, execution, Row}
 
 /**
  * A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution.
@@ -162,16 +164,60 @@ class HiveQuerySuite extends HiveComparisonTest {
     hql("SELECT * FROM src").toString
   }
 
+  private val explainCommandClassName =
+    classOf[execution.ExplainCommand].getSimpleName.stripSuffix("$")
+
+  def isExplanation(result: SchemaRDD) = {
+    val explanation = result.select('plan).collect().map { case Row(plan: String) => plan }
+    explanation.size == 1 && explanation.head.startsWith(explainCommandClassName)
+  }
+
   test("SPARK-1704: Explain commands as a SchemaRDD") {
     hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
+
     val rdd = hql("explain select key, count(value) from src group by key")
-    assert(rdd.collect().size == 1)
-    assert(rdd.toString.contains("ExplainCommand"))
-    assert(rdd.filter(row => row.toString.contains("ExplainCommand")).collect().size == 0,
-      "actual contents of the result should be the plans of the query to be explained")
+    assert(isExplanation(rdd))
+
     TestHive.reset()
   }
 
+  test("Query Hive native command execution result") {
+    val tableName = "test_native_commands"
+
+    val q0 = hql(s"DROP TABLE IF EXISTS $tableName")
+    assert(q0.count() == 0)
+
+    val q1 = hql(s"CREATE TABLE $tableName(key INT, value STRING)")
+    assert(q1.count() == 0)
+
+    val q2 = hql("SHOW TABLES")
+    val tables = q2.select('result).collect().map { case Row(table: String) => table }
+    assert(tables.contains(tableName))
+
+    val q3 = hql(s"DESCRIBE $tableName")
+    assertResult(Array(Array("key", "int", "None"), Array("value", "string", "None"))) {
+      q3.select('result).collect().map { case Row(fieldDesc: String) =>
+        fieldDesc.split("\t").map(_.trim)
+      }
+    }
+
+    val q4 = hql(s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key")
+    assert(isExplanation(q4))
+
+    TestHive.reset()
+  }
+
+  test("Exactly once semantics for DDL and command statements") {
+    val tableName = "test_exactly_once"
+    val q0 = hql(s"CREATE TABLE $tableName(key INT, value STRING)")
+
+    // If the table was not created, the following assertion would fail
+    assert(Try(table(tableName)).isSuccess)
+
+    // If the CREATE TABLE command got executed again, the following assertion would fail
+    assert(Try(q0.count()).isSuccess)
+  }
+
   test("parse HQL set commands") {
     // Adapted from its SQL counterpart.
     val testKey = "spark.sql.key.usedfortestonly"
@@ -195,52 +241,69 @@ class HiveQuerySuite extends HiveComparisonTest {
   test("SET commands semantics for a HiveContext") {
     // Adapted from its SQL counterpart.
     val testKey = "spark.sql.key.usedfortestonly"
-    var testVal = "test.val.0"
+    val testVal = "test.val.0"
     val nonexistentKey = "nonexistent"
-    def fromRows(row: Array[Row]): Array[String] = row.map(_.getString(0))
+    def rowsToPairs(rows: Array[Row]) = rows.map { case Row(key: String, value: String) =>
+      key -> value
+    }
 
     clear()
 
     // "set" itself returns all config variables currently specified in SQLConf.
-    assert(hql("set").collect().size == 0)
+    assert(hql("SET").collect().size == 0)
+
+    assertResult(Array(testKey -> testVal)) {
+      rowsToPairs(hql(s"SET $testKey=$testVal").collect())
+    }
 
-    // "set key=val"
-    hql(s"SET $testKey=$testVal")
-    assert(fromRows(hql("SET").collect()) sameElements Array(s"$testKey=$testVal"))
     assert(hiveconf.get(testKey, "") == testVal)
+    assertResult(Array(testKey -> testVal)) {
+      rowsToPairs(hql("SET").collect())
+    }
 
     hql(s"SET ${testKey + testKey}=${testVal + testVal}")
-    assert(fromRows(hql("SET").collect()) sameElements
-      Array(
-        s"$testKey=$testVal",
-        s"${testKey + testKey}=${testVal + testVal}"))
     assert(hiveconf.get(testKey + testKey, "") == testVal + testVal)
+    assertResult(Array(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) {
+      rowsToPairs(hql("SET").collect())
+    }
 
     // "set key"
-    assert(fromRows(hql(s"SET $testKey").collect()) sameElements
-      Array(s"$testKey=$testVal"))
-    assert(fromRows(hql(s"SET $nonexistentKey").collect()) sameElements
-      Array(s"$nonexistentKey is undefined"))
+    assertResult(Array(testKey -> testVal)) {
+      rowsToPairs(hql(s"SET $testKey").collect())
+    }
+
+    assertResult(Array(nonexistentKey -> "<undefined>")) {
+      rowsToPairs(hql(s"SET $nonexistentKey").collect())
+    }
 
     // Assert that sql() should have the same effects as hql() by repeating the above using sql().
     clear()
-    assert(sql("set").collect().size == 0)
+    assert(sql("SET").collect().size == 0)
+
+    assertResult(Array(testKey -> testVal)) {
+      rowsToPairs(sql(s"SET $testKey=$testVal").collect())
+    }
 
-    sql(s"SET $testKey=$testVal")
-    assert(fromRows(sql("SET").collect()) sameElements Array(s"$testKey=$testVal"))
     assert(hiveconf.get(testKey, "") == testVal)
+    assertResult(Array(testKey -> testVal)) {
+      rowsToPairs(sql("SET").collect())
+    }
 
     sql(s"SET ${testKey + testKey}=${testVal + testVal}")
-    assert(fromRows(sql("SET").collect()) sameElements
-      Array(
-        s"$testKey=$testVal",
-        s"${testKey + testKey}=${testVal + testVal}"))
     assert(hiveconf.get(testKey + testKey, "") == testVal + testVal)
+    assertResult(Array(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) {
+      rowsToPairs(sql("SET").collect())
+    }
 
-    assert(fromRows(sql(s"SET $testKey").collect()) sameElements
-      Array(s"$testKey=$testVal"))
-    assert(fromRows(sql(s"SET $nonexistentKey").collect()) sameElements
-      Array(s"$nonexistentKey is undefined"))
+    assertResult(Array(testKey -> testVal)) {
+      rowsToPairs(sql(s"SET $testKey").collect())
+    }
+
+    assertResult(Array(nonexistentKey -> "<undefined>")) {
+      rowsToPairs(sql(s"SET $nonexistentKey").collect())
+    }
+
+    clear()
   }
 
   // Put tests that depend on specific Hive settings before these last two test,
-- 
GitLab