diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 16da7fd92bffea787899676595443ccb62f7f5c4..91500416eefaafa73693b9f34b084abcf48710e7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -99,7 +99,7 @@ class SchemaRDD(
   def baseSchemaRDD = this
 
   // =========================================================================================
-  // RDD functions: Copy the interal row representation so we present immutable data to users.
+  // RDD functions: Copy the internal row representation so we present immutable data to users.
   // =========================================================================================
 
   override def compute(split: Partition, context: TaskContext): Iterator[Row] =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala
index 7d49ab07f7a53bb8ae44b70af96bc5b230088099..b7f8826861a2cc5727ab5ae98a71b904ec0c9bee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala
@@ -54,4 +54,6 @@ private[sql] trait NullableColumnAccessor extends ColumnAccessor {
 
     pos += 1
   }
+
+  abstract override def hasNext = seenNulls < nullCount || super.hasNext
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
index fd3b1adf9687a5c8db99d1220dc4df5260b98311..0f808f68f2eec633f6674b3a683c91c3834e9c7d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
@@ -65,7 +65,9 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]
 
   abstract override def appendFrom(row: Row, ordinal: Int) {
     super.appendFrom(row, ordinal)
-    gatherCompressibilityStats(row, ordinal)
+    if (!row.isNullAt(ordinal)) {
+      gatherCompressibilityStats(row, ordinal)
+    }
   }
 
   abstract override def build() = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
index c605a8e4434e3254db5792b4757029cea7c71a98..ba1810dd2ae66c69c574456cafae222e697ff640 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.columnar.compression
 
-import java.nio.ByteBuffer
+import java.nio.{ByteOrder, ByteBuffer}
 
 import org.apache.spark.sql.catalyst.types.NativeType
 import org.apache.spark.sql.columnar.{ColumnType, NativeColumnType}
@@ -84,7 +84,7 @@ private[sql] object CompressionScheme {
   }
 
   def columnHeaderSize(columnBuffer: ByteBuffer): Int = {
-    val header = columnBuffer.duplicate()
+    val header = columnBuffer.duplicate().order(ByteOrder.nativeOrder)
     val nullCount = header.getInt(4)
     // Column type ID + null count + null positions
     4 + 4 + 4 * nullCount
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
index e92cf5ac4f9df79bbfcc27b7fda8636bdcedcc9c..800009d3195e141057fb67c4b341a5e7dbb21c8b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
@@ -397,26 +397,27 @@ private[sql] sealed abstract class IntegralDelta[I <: IntegralType] extends Comp
 
       if (initial) {
         initial = false
-        prev = value
         _compressedSize += 1 + columnType.defaultSize
       } else {
         val (smallEnough, _) = byteSizedDelta(value, prev)
         _compressedSize += (if (smallEnough) 1 else 1 + columnType.defaultSize)
       }
+
+      prev = value
     }
 
     override def compress(from: ByteBuffer, to: ByteBuffer, columnType: NativeColumnType[I]) = {
       to.putInt(typeId)
 
       if (from.hasRemaining) {
-        val prev = columnType.extract(from)
-
+        var prev = columnType.extract(from)
         to.put(Byte.MinValue)
         columnType.append(prev, to)
 
         while (from.hasRemaining) {
           val current = columnType.extract(from)
           val (smallEnough, delta) = byteSizedDelta(current, prev)
+          prev = current
 
           if (smallEnough) {
             to.put(delta)
@@ -443,13 +444,8 @@ private[sql] sealed abstract class IntegralDelta[I <: IntegralType] extends Comp
 
     override def next() = {
       val delta = buffer.get()
-
-      if (delta > Byte.MinValue) {
-        addDelta(prev, delta)
-      } else {
-        prev = columnType.extract(buffer)
-        prev
-      }
+      prev = if (delta > Byte.MinValue) addDelta(prev, delta) else columnType.extract(buffer)
+      prev
     }
 
     override def hasNext = buffer.hasRemaining
@@ -465,7 +461,7 @@ private[sql] case object IntDelta extends IntegralDelta[IntegerType.type] {
 
   override protected def byteSizedDelta(x: Int, y: Int): (Boolean, Byte) = {
     val delta = x - y
-    if (delta < Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
+    if (math.abs(delta) <= Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
   }
 }
 
@@ -478,6 +474,6 @@ private[sql] case object LongDelta extends IntegralDelta[LongType.type] {
 
   override protected def byteSizedDelta(x: Long, y: Long): (Boolean, Byte) = {
     val delta = x - y
-    if (delta < Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
+    if (math.abs(delta) <= Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 450c142c0baa4aefa15852b091308e95e03335ac..070557e47c4c7fafa8c3911c894db42084872e7f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -61,7 +61,14 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
         shuffled.map(_._1)
 
       case SinglePartition =>
-        child.execute().coalesce(1, shuffle = true)
+        val rdd = child.execute().mapPartitions { iter =>
+          val mutablePair = new MutablePair[Null, Row]()
+          iter.map(r => mutablePair.update(null, r))
+        }
+        val partitioner = new HashPartitioner(1)
+        val shuffled = new ShuffledRDD[Null, Row, MutablePair[Null, Row]](rdd, partitioner)
+        shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
+        shuffled.map(_._2)
 
       case _ => sys.error(s"Exchange not implemented for $newPartitioning")
       // TODO: Handle BroadcastPartitioning.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index daa423cb8ea1a6a9b332e34006361391ba83c6cf..5d89697db5f993aaf2f61e46963841f0c049af70 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -70,8 +70,8 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
     SparkLogicalPlan(
       alreadyPlanned match {
         case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
-        case InMemoryColumnarTableScan(output, child) =>
-          InMemoryColumnarTableScan(output.map(_.newInstance), child)
+        case scan @ InMemoryColumnarTableScan(output, child) =>
+          scan.copy(attributes = output.map(_.newInstance))
         case _ => sys.error("Multiple instance of the same relation detected.")
       }).asInstanceOf[this.type]
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 7c6a642278226fd40222b32a79e886acd7b467ca..0331f90272a9944c53eb934c367cb53d8c6f2299 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -17,11 +17,10 @@
 
 package org.apache.spark.sql
 
-import org.scalatest.FunSuite
 import org.apache.spark.sql.TestData._
-import org.apache.spark.sql.test.TestSQLContext
-import org.apache.spark.sql.execution.SparkLogicalPlan
 import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
+import org.apache.spark.sql.execution.SparkLogicalPlan
+import org.apache.spark.sql.test.TestSQLContext
 
 class CachedTableSuite extends QueryTest {
   TestData // Load test tables.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
similarity index 79%
rename from sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala
rename to sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
index 2ed4cf2170f9d0d954e106b5a6d00a6e07c1ef36..16a13b8a74960fc942809529fac3e03576f8e193 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
@@ -18,10 +18,11 @@
 package org.apache.spark.sql.columnar
 
 import org.apache.spark.sql.{QueryTest, TestData}
+import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.execution.SparkLogicalPlan
 import org.apache.spark.sql.test.TestSQLContext
 
-class ColumnarQuerySuite extends QueryTest {
+class InMemoryColumnarQuerySuite extends QueryTest {
   import TestData._
   import TestSQLContext._
 
@@ -32,6 +33,15 @@ class ColumnarQuerySuite extends QueryTest {
     checkAnswer(scan, testData.collect().toSeq)
   }
 
+  test("projection") {
+    val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
+    val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
+
+    checkAnswer(scan, testData.collect().map {
+      case Row(key: Int, value: String) => value -> key
+    }.toSeq)
+  }
+
   test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
     val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
     val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
index 4a21eb6201a69cc5d4320dae8cb79a52364db9d7..35ab14cbc353dc1015fdaf27963e443a8800250c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
@@ -68,12 +68,16 @@ class NullableColumnAccessorSuite extends FunSuite {
       val row = new GenericMutableRow(1)
 
       (0 until 4).foreach { _ =>
+        assert(accessor.hasNext)
         accessor.extractTo(row, 0)
         assert(row(0) === randomRow(0))
 
+        assert(accessor.hasNext)
         accessor.extractTo(row, 0)
         assert(row.isNullAt(0))
       }
+
+      assert(!accessor.hasNext)
     }
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
index 1390e5eef61061189c51103f231ba71698cd9da7..ce419ca7269bac6516b0b22eaf62e6a8ecb61fe8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.FunSuite
 import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
 import org.apache.spark.sql.catalyst.types.IntegralType
 import org.apache.spark.sql.columnar._
+import org.apache.spark.sql.columnar.ColumnarTestUtils._
 
 class IntegralDeltaSuite extends FunSuite {
   testIntegralDelta(new IntColumnStats,  INT,  IntDelta)
@@ -63,7 +64,7 @@ class IntegralDeltaSuite extends FunSuite {
       } else {
         val oneBoolean = columnType.defaultSize
         1 + oneBoolean + deltas.map {
-          d => if (math.abs(d) < Byte.MaxValue) 1 else 1 + oneBoolean
+          d => if (math.abs(d) <= Byte.MaxValue) 1 else 1 + oneBoolean
         }.sum
       })
 
@@ -78,7 +79,7 @@ class IntegralDeltaSuite extends FunSuite {
         expectResult(input.head, "The first value is wrong")(columnType.extract(buffer))
 
         (input.tail, deltas).zipped.foreach { (value, delta) =>
-          if (delta < Byte.MaxValue) {
+          if (math.abs(delta) <= Byte.MaxValue) {
             expectResult(delta, "Wrong delta")(buffer.get())
           } else {
             expectResult(Byte.MinValue, "Expecting escaping mark here")(buffer.get())
@@ -105,11 +106,17 @@ class IntegralDeltaSuite extends FunSuite {
 
     test(s"$scheme: simple case") {
       val input = columnType match {
-        case INT  => Seq(1: Int,  2: Int,  130: Int)
-        case LONG => Seq(1: Long, 2: Long, 130: Long)
+        case INT  => Seq(2: Int,  1: Int,  2: Int,  130: Int)
+        case LONG => Seq(2: Long, 1: Long, 2: Long, 130: Long)
       }
 
       skeleton(input.map(_.asInstanceOf[I#JvmType]))
     }
+
+    test(s"$scheme: long random series") {
+      // Have to workaround with `Any` since no `ClassTag[I#JvmType]` available here.
+      val input = Array.fill[Any](10000)(makeRandomValue(columnType))
+      skeleton(input.map(_.asInstanceOf[I#JvmType]))
+    }
   }
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index fc053c56c052d2d7c4ecfaa986202dc24093a7fb..c36b5878cb007507c8c8cd42e6a4cf8a7c2b0aea 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -33,6 +33,8 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.execution.SparkLogicalPlan
+import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
 
 /* Implicit conversions */
 import scala.collection.JavaConversions._
@@ -115,23 +117,31 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
       case p: LogicalPlan if !p.childrenResolved => p
 
       case p @ InsertIntoTable(table: MetastoreRelation, _, child, _) =>
-        val childOutputDataTypes = child.output.map(_.dataType)
-        // Only check attributes, not partitionKeys since they are always strings.
-        // TODO: Fully support inserting into partitioned tables.
-        val tableOutputDataTypes = table.attributes.map(_.dataType)
-
-        if (childOutputDataTypes == tableOutputDataTypes) {
-          p
-        } else {
-          // Only do the casting when child output data types differ from table output data types.
-          val castedChildOutput = child.output.zip(table.output).map {
-            case (input, output) if input.dataType != output.dataType =>
-              Alias(Cast(input, output.dataType), input.name)()
-            case (input, _) => input
-          }
-
-          p.copy(child = logical.Project(castedChildOutput, child))
+        castChildOutput(p, table, child)
+
+      case p @ logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
+        _, HiveTableScan(_, table, _))), _, child, _) =>
+        castChildOutput(p, table, child)
+    }
+
+    def castChildOutput(p: InsertIntoTable, table: MetastoreRelation, child: LogicalPlan) = {
+      val childOutputDataTypes = child.output.map(_.dataType)
+      // Only check attributes, not partitionKeys since they are always strings.
+      // TODO: Fully support inserting into partitioned tables.
+      val tableOutputDataTypes = table.attributes.map(_.dataType)
+
+      if (childOutputDataTypes == tableOutputDataTypes) {
+        p
+      } else {
+        // Only do the casting when child output data types differ from table output data types.
+        val castedChildOutput = child.output.zip(table.output).map {
+          case (input, output) if input.dataType != output.dataType =>
+            Alias(Cast(input, output.dataType), input.name)()
+          case (input, _) => input
         }
+
+        p.copy(child = logical.Project(castedChildOutput, child))
+      }
     }
   }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 3ca1d93c11fa913b773f1ba6528a03836ccf9a97..ac817b21a152ea566cbb797fe0e4b878d9e33227 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution._
+import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
 
 trait HiveStrategies {
   // Possibly being too clever with types here... or not clever enough.
@@ -42,6 +43,9 @@ trait HiveStrategies {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
         InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
+      case logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
+        _, HiveTableScan(_, table, _))), partition, child, overwrite) =>
+        InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
       case _ => Nil
     }
   }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index 2fea9702954d72cf2182867680bb316104f2c7fe..465e5f146fe71750fdf8442c48a0f07cc3415427 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -160,12 +160,6 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
     TestTable("src1",
       "CREATE TABLE src1 (key INT, value STRING)".cmd,
       s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv3.txt")}' INTO TABLE src1".cmd),
-    TestTable("dest1",
-      "CREATE TABLE IF NOT EXISTS dest1 (key INT, value STRING)".cmd),
-    TestTable("dest2",
-      "CREATE TABLE IF NOT EXISTS dest2 (key INT, value STRING)".cmd),
-    TestTable("dest3",
-      "CREATE TABLE IF NOT EXISTS dest3 (key INT, value STRING)".cmd),
     TestTable("srcpart", () => {
       runSqlHive(
         "CREATE TABLE srcpart (key INT, value STRING) PARTITIONED BY (ds STRING, hr STRING)")
@@ -257,6 +251,7 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
 
   private val loadedTables = new collection.mutable.HashSet[String]
 
+  var cacheTables: Boolean = false
   def loadTestTable(name: String) {
     if (!(loadedTables contains name)) {
       // Marks the table as loaded first to prevent infite mutually recursive table loading.
@@ -265,6 +260,9 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
       val createCmds =
         testTables.get(name).map(_.commands).getOrElse(sys.error(s"Unknown test table $name"))
       createCmds.foreach(_())
+
+      if (cacheTables)
+        cacheTable(name)
     }
   }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index f9b437d435eba98916e3603cd2988c9be19b0da9..55a4363af6c76868157054aab969a18ed82fbec7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -130,8 +130,7 @@ trait HiveFunctionFactory {
   }
 }
 
-abstract class HiveUdf
-    extends Expression with Logging with HiveFunctionFactory {
+abstract class HiveUdf extends Expression with Logging with HiveFunctionFactory {
   self: Product =>
 
   type UDFType
@@ -146,7 +145,7 @@ abstract class HiveUdf
   lazy val functionInfo = getFunctionInfo(name)
   lazy val function = createFunction[UDFType](name)
 
-  override def toString = s"${nodeName}#${functionInfo.getDisplayName}(${children.mkString(",")})"
+  override def toString = s"$nodeName#${functionInfo.getDisplayName}(${children.mkString(",")})"
 }
 
 case class HiveSimpleUdf(name: String, children: Seq[Expression]) extends HiveUdf {
@@ -202,10 +201,11 @@ case class HiveSimpleUdf(name: String, children: Seq[Expression]) extends HiveUd
   }
 }
 
-case class HiveGenericUdf(
-    name: String,
-    children: Seq[Expression]) extends HiveUdf with HiveInspectors {
+case class HiveGenericUdf(name: String, children: Seq[Expression])
+  extends HiveUdf with HiveInspectors {
+
   import org.apache.hadoop.hive.ql.udf.generic.GenericUDF._
+
   type UDFType = GenericUDF
 
   @transient
@@ -357,7 +357,7 @@ case class HiveGenericUdaf(
 
   override def toString = s"$nodeName#$name(${children.mkString(",")})"
 
-  def newInstance = new HiveUdafFunction(name, children, this)
+  def newInstance() = new HiveUdafFunction(name, children, this)
 }
 
 /**
@@ -435,7 +435,7 @@ case class HiveGenericUdtf(
     }
   }
 
-  override def toString() = s"$nodeName#$name(${children.mkString(",")})"
+  override def toString = s"$nodeName#$name(${children.mkString(",")})"
 }
 
 case class HiveUdafFunction(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 3cc4562a88d669f501ccac16554f96c216a53f48..6c91f40d0f925ec0a6a1b44b2995fd6438ea908f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -218,10 +218,7 @@ abstract class HiveComparisonTest
         val quotes = "\"\"\""
         queryList.zipWithIndex.map {
           case (query, i) =>
-            s"""
-              |val q$i = $quotes$query$quotes.q
-              |q$i.stringResult()
-            """.stripMargin
+            s"""val q$i = hql($quotes$query$quotes); q$i.collect()"""
         }.mkString("\n== Console version of this test ==\n", "\n", "\n")
       }
 
@@ -287,7 +284,6 @@ abstract class HiveComparisonTest
                         |Error: ${e.getMessage}
                         |${stackTraceToString(e)}
                         |$queryString
-                        |$consoleTestCase
                       """.stripMargin
                     stringToFile(
                       new File(hiveFailedDirectory, testCaseName),
@@ -304,7 +300,7 @@ abstract class HiveComparisonTest
         val catalystResults = queryList.zip(hiveResults).map { case (queryString, hive) =>
           val query = new TestHive.HiveQLQueryExecution(queryString)
           try { (query, prepareAnswer(query, query.stringResult())) } catch {
-            case e: Exception =>
+            case e: Throwable =>
               val errorMessage =
                 s"""
                   |Failed to execute query using catalyst:
@@ -313,8 +309,6 @@ abstract class HiveComparisonTest
                   |$query
                   |== HIVE - ${hive.size} row(s) ==
                   |${hive.mkString("\n")}
-                  |
-                  |$consoleTestCase
                 """.stripMargin
               stringToFile(new File(failedDirectory, testCaseName), errorMessage + consoleTestCase)
               fail(errorMessage)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index f76e16bc1afc56297b003f4372ed7e186318a463..c3cfa3d25a5c20c202cccf81929ee3c0dbd01b34 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -17,16 +17,26 @@
 
 package org.apache.spark.sql.hive.execution
 
+import org.scalatest.BeforeAndAfter
+
 import org.apache.spark.sql.hive.TestHive
 
 /**
  * Runs the test cases that are included in the hive distribution.
  */
-class HiveCompatibilitySuite extends HiveQueryFileTest {
+class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
   // TODO: bundle in jar files... get from classpath
   lazy val hiveQueryDir = TestHive.getHiveFile("ql/src/test/queries/clientpositive")
   def testCases = hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f)
 
+  override def beforeAll() {
+    TestHive.cacheTables = true
+  }
+
+  override def afterAll() {
+    TestHive.cacheTables = false
+  }
+
   /** A list of tests deemed out of scope currently and thus completely disregarded. */
   override def blackList = Seq(
     // These tests use hooks that are not on the classpath and thus break all subsequent execution.