diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 0fed9171a8bf3bca8ce7e443a5a7f9b70793a3b0..20578780283835f70424c72d5dd8656e8214eba8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -426,7 +426,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    *
    * @param path input path
    * @since 1.5.0
-   * @note Currently, this method can only be used together with `HiveContext`.
+   * @note Currently, this method can only be used after enabling Hive support.
    */
   def orc(path: String): DataFrame = format("orc").load(path)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index c7b887ecd4d3e8852838887fb3f406589946c4b8..f2ba2dfc086c22e51a2cbb1ce03b32324c2ead35 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -639,7 +639,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
    * This will overwrite `orc.compress`. </li>
    *
    * @since 1.5.0
-   * @note Currently, this method can only be used together with `HiveContext`.
+   * @note Currently, this method can only be used after enabling Hive support
    */
   def orc(path: String): Unit = {
     assertNotStreaming("orc() can only be called on non-continuous queries")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 0614747352c119a962dc0d854b102781655b70e7..e2dc4d86395ee85d67400ea5d313b363bd8331f5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -515,7 +515,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
     )
   }
 
-  test("callUDF in SQLContext") {
+  test("callUDF without Hive Support") {
     val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value")
     df.sparkSession.udf.register("simpleUDF", (v: Int) => v * v)
     checkAnswer(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 32320a6435acb4e8c5a57f9a28cb1bd4eee0d6f9..2a6591653e6444b2f84a96cbe17c543f2f5a6bc9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -673,7 +673,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
 
   test("runtime null check for RowEncoder") {
     val schema = new StructType().add("i", IntegerType, nullable = false)
-    val df = sqlContext.range(10).map(l => {
+    val df = spark.range(10).map(l => {
       if (l % 5 == 0) {
         Row(null)
       } else {
@@ -689,9 +689,9 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
 
   test("row nullability mismatch") {
     val schema = new StructType().add("a", StringType, true).add("b", StringType, false)
-    val rdd = sqlContext.sparkContext.parallelize(Row(null, "123") :: Row("234", null) :: Nil)
+    val rdd = spark.sparkContext.parallelize(Row(null, "123") :: Row("234", null) :: Nil)
     val message = intercept[Exception] {
-      sqlContext.createDataFrame(rdd, schema).collect()
+      spark.createDataFrame(rdd, schema).collect()
     }.getMessage
     assert(message.contains("The 1th field 'b' of input row cannot be null"))
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
deleted file mode 100644
index b447006761f45638b5cb0127dcb12bb96cd4118a..0000000000000000000000000000000000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql
-
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType}
-
-class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContext {
-  import testImplicits._
-
-  private lazy val df = (1 to 10).map(i => (i, s"str$i")).toDF("key", "value")
-
-  before {
-    df.createOrReplaceTempView("listtablessuitetable")
-  }
-
-  after {
-    spark.sessionState.catalog.dropTable(
-      TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true)
-  }
-
-  test("get all tables") {
-    checkAnswer(
-      spark.sqlContext.tables().filter("tableName = 'listtablessuitetable'"),
-      Row("listtablessuitetable", true))
-
-    checkAnswer(
-      sql("SHOW tables").filter("tableName = 'listtablessuitetable'"),
-      Row("listtablessuitetable", true))
-
-    spark.sessionState.catalog.dropTable(
-      TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true)
-    assert(spark.sqlContext.tables().filter("tableName = 'listtablessuitetable'").count() === 0)
-  }
-
-  test("getting all tables with a database name has no impact on returned table names") {
-    checkAnswer(
-      spark.sqlContext.tables("default").filter("tableName = 'listtablessuitetable'"),
-      Row("listtablessuitetable", true))
-
-    checkAnswer(
-      sql("show TABLES in default").filter("tableName = 'listtablessuitetable'"),
-      Row("listtablessuitetable", true))
-
-    spark.sessionState.catalog.dropTable(
-      TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true)
-    assert(spark.sqlContext.tables().filter("tableName = 'listtablessuitetable'").count() === 0)
-  }
-
-  test("query the returned DataFrame of tables") {
-    val expectedSchema = StructType(
-      StructField("tableName", StringType, false) ::
-      StructField("isTemporary", BooleanType, false) :: Nil)
-
-    Seq(spark.sqlContext.tables(), sql("SHOW TABLes")).foreach {
-      case tableDF =>
-        assert(expectedSchema === tableDF.schema)
-
-        tableDF.createOrReplaceTempView("tables")
-        checkAnswer(
-          sql(
-            "SELECT isTemporary, tableName from tables WHERE tableName = 'listtablessuitetable'"),
-          Row(true, "listtablessuitetable")
-        )
-        checkAnswer(
-          spark.sqlContext.tables()
-            .filter("tableName = 'tables'").select("tableName", "isTemporary"),
-          Row("tables", true))
-        spark.catalog.dropTempView("tables")
-    }
-  }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
index c9594a7e9ab2865370685c2f9b044e47e2ad798c..417d09e238c62552d8835469caeb8d4a6df385db 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
@@ -20,7 +20,9 @@ package org.apache.spark.sql
 import org.apache.spark.{SharedSparkContext, SparkFunSuite}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType}
 
 class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
 
@@ -79,4 +81,63 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
     assert(sqlContext.sessionState.optimizer.batches.flatMap(_.rules).contains(DummyRule))
   }
 
+  test("get all tables") {
+    val sqlContext = SQLContext.getOrCreate(sc)
+    val df = sqlContext.range(10)
+    df.createOrReplaceTempView("listtablessuitetable")
+    assert(
+      sqlContext.tables().filter("tableName = 'listtablessuitetable'").collect().toSeq ==
+      Row("listtablessuitetable", true) :: Nil)
+
+    assert(
+      sqlContext.sql("SHOW tables").filter("tableName = 'listtablessuitetable'").collect().toSeq ==
+      Row("listtablessuitetable", true) :: Nil)
+
+    sqlContext.sessionState.catalog.dropTable(
+      TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true)
+    assert(sqlContext.tables().filter("tableName = 'listtablessuitetable'").count() === 0)
+  }
+
+  test("getting all tables with a database name has no impact on returned table names") {
+    val sqlContext = SQLContext.getOrCreate(sc)
+    val df = sqlContext.range(10)
+    df.createOrReplaceTempView("listtablessuitetable")
+    assert(
+      sqlContext.tables("default").filter("tableName = 'listtablessuitetable'").collect().toSeq ==
+      Row("listtablessuitetable", true) :: Nil)
+
+    assert(
+      sqlContext.sql("show TABLES in default").filter("tableName = 'listtablessuitetable'")
+        .collect().toSeq == Row("listtablessuitetable", true) :: Nil)
+
+    sqlContext.sessionState.catalog.dropTable(
+      TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true)
+    assert(sqlContext.tables().filter("tableName = 'listtablessuitetable'").count() === 0)
+  }
+
+  test("query the returned DataFrame of tables") {
+    val sqlContext = SQLContext.getOrCreate(sc)
+    val df = sqlContext.range(10)
+    df.createOrReplaceTempView("listtablessuitetable")
+
+    val expectedSchema = StructType(
+      StructField("tableName", StringType, false) ::
+        StructField("isTemporary", BooleanType, false) :: Nil)
+
+    Seq(sqlContext.tables(), sqlContext.sql("SHOW TABLes")).foreach {
+      case tableDF =>
+        assert(expectedSchema === tableDF.schema)
+
+        tableDF.createOrReplaceTempView("tables")
+        assert(
+          sqlContext.sql(
+            "SELECT isTemporary, tableName from tables WHERE tableName = 'listtablessuitetable'")
+            .collect().toSeq == Row(true, "listtablessuitetable") :: Nil)
+        assert(
+          sqlContext.tables().filter("tableName = 'tables'").select("tableName", "isTemporary")
+            .collect().toSeq == Row("tables", true) :: Nil)
+        sqlContext.dropTempTable("tables")
+    }
+  }
+
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index b1f848fdc89a32e7c7a4ec5fed00eee550fe863d..1ddb586d60fd9681b03dbfffe7c67cb95c804ee9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1048,7 +1048,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
   }
 
   test("SET commands semantics using sql()") {
-    spark.sqlContext.conf.clear()
+    spark.sessionState.conf.clear()
     val testKey = "test.key.0"
     val testVal = "test.val.0"
     val nonexistentKey = "nonexistent"
@@ -1089,17 +1089,17 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
       sql(s"SET $nonexistentKey"),
       Row(nonexistentKey, "<undefined>")
     )
-    spark.sqlContext.conf.clear()
+    spark.sessionState.conf.clear()
   }
 
   test("SET commands with illegal or inappropriate argument") {
-    spark.sqlContext.conf.clear()
+    spark.sessionState.conf.clear()
     // Set negative mapred.reduce.tasks for automatically determining
     // the number of reducers is not supported
     intercept[IllegalArgumentException](sql(s"SET mapred.reduce.tasks=-1"))
     intercept[IllegalArgumentException](sql(s"SET mapred.reduce.tasks=-01"))
     intercept[IllegalArgumentException](sql(s"SET mapred.reduce.tasks=-2"))
-    spark.sqlContext.conf.clear()
+    spark.sessionState.conf.clear()
   }
 
   test("apply schema") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
index 2f45db3925a00f9d96a8239d573886bf22ec2388..2803b624624176af86bf0814280e61fc3a7c0d16 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
@@ -27,12 +27,12 @@ import org.apache.spark.sql.internal.SQLConf
 
 class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
 
-  private var originalActiveSQLContext: Option[SparkSession] = _
-  private var originalInstantiatedSQLContext: Option[SparkSession] = _
+  private var originalActiveSparkSession: Option[SparkSession] = _
+  private var originalInstantiatedSparkSession: Option[SparkSession] = _
 
   override protected def beforeAll(): Unit = {
-    originalActiveSQLContext = SparkSession.getActiveSession
-    originalInstantiatedSQLContext = SparkSession.getDefaultSession
+    originalActiveSparkSession = SparkSession.getActiveSession
+    originalInstantiatedSparkSession = SparkSession.getDefaultSession
 
     SparkSession.clearActiveSession()
     SparkSession.clearDefaultSession()
@@ -40,8 +40,8 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
 
   override protected def afterAll(): Unit = {
     // Set these states back.
-    originalActiveSQLContext.foreach(ctx => SparkSession.setActiveSession(ctx))
-    originalInstantiatedSQLContext.foreach(ctx => SparkSession.setDefaultSession(ctx))
+    originalActiveSparkSession.foreach(ctx => SparkSession.setActiveSession(ctx))
+    originalInstantiatedSparkSession.foreach(ctx => SparkSession.setDefaultSession(ctx))
   }
 
   private def checkEstimation(
@@ -249,7 +249,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
     }
   }
 
-  def withSQLContext(
+  def withSparkSession(
       f: SparkSession => Unit,
       targetNumPostShufflePartitions: Int,
       minNumPostShufflePartitions: Option[Int]): Unit = {
@@ -322,7 +322,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
         }
       }
 
-      withSQLContext(test, 2000, minNumPostShufflePartitions)
+      withSparkSession(test, 2000, minNumPostShufflePartitions)
     }
 
     test(s"determining the number of reducers: join operator$testNameNote") {
@@ -373,7 +373,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
         }
       }
 
-      withSQLContext(test, 16384, minNumPostShufflePartitions)
+      withSparkSession(test, 16384, minNumPostShufflePartitions)
     }
 
     test(s"determining the number of reducers: complex query 1$testNameNote") {
@@ -425,7 +425,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
         }
       }
 
-      withSQLContext(test, 6644, minNumPostShufflePartitions)
+      withSparkSession(test, 6644, minNumPostShufflePartitions)
     }
 
     test(s"determining the number of reducers: complex query 2$testNameNote") {
@@ -477,7 +477,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
         }
       }
 
-      withSQLContext(test, 6144, minNumPostShufflePartitions)
+      withSparkSession(test, 6144, minNumPostShufflePartitions)
     }
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 8243470b19334de0c6d2c8f8e9a0450bed4824d4..c96239e68201809d34224ad41c0890a2819d069f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -155,7 +155,7 @@ class PlannerSuite extends SharedSQLContext {
       val path = file.getCanonicalPath
       testData.write.parquet(path)
       val df = spark.read.parquet(path)
-      spark.sqlContext.registerDataFrameAsTable(df, "testPushed")
+      df.createOrReplaceTempView("testPushed")
 
       withTempTable("testPushed") {
         val exp = sql("select * from testPushed where key = 15").queryExecution.sparkPlan
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
index ebeb39b690e6944ed3bcfd73a75db0129840a6b8..c3acf29c2d36f8a7c89c38f4ece86de387a6fabe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
@@ -95,7 +95,7 @@ class SortSuite extends SparkPlanTest with SharedSQLContext {
   ) {
     test(s"sorting on $dataType with nullable=$nullable, sortOrder=$sortOrder") {
       val inputData = Seq.fill(1000)(randomDataGenerator())
-      val inputDf = sqlContext.createDataFrame(
+      val inputDf = spark.createDataFrame(
         sparkContext.parallelize(Random.shuffle(inputData).map(v => Row(v))),
         StructType(StructField("a", dataType, nullable = true) :: Nil)
       )
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
index fba04d0cb265375840e2f554f8ddea0e013ead2a..3217e34bd8ad3a892a4c639afede71408f98a627 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
@@ -42,7 +42,7 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with SharedSQLContext {
       .add("a", IntegerType, nullable = false)
       .add("b", IntegerType, nullable = false)
     val inputData = Seq.fill(10000)(Row(rand.nextInt(), rand.nextInt()))
-    sqlContext.createDataFrame(sparkContext.parallelize(Random.shuffle(inputData), 10), schema)
+    spark.createDataFrame(sparkContext.parallelize(Random.shuffle(inputData), 10), schema)
   }
 
   /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index ff56749387f5690b808eabfc298231940fd7c380..e32521aaafecd917a10fe389ed51b789d3524486 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -132,7 +132,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
         sql(s"CREATE DATABASE db2")
         val pathInCatalog = new Path(catalog.getDatabaseMetadata("db2").locationUri).toUri
         assert("file" === pathInCatalog.getScheme)
-        val expectedPath = appendTrailingSlash(sqlContext.conf.warehousePath) + "db2.db"
+        val expectedPath = appendTrailingSlash(spark.sessionState.conf.warehousePath) + "db2.db"
         assert(expectedPath === pathInCatalog.getPath)
       }
 
@@ -145,7 +145,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     withTempDir { tmpDir =>
       val path = tmpDir.toString
       withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) {
-        val catalog = sqlContext.sessionState.catalog
+        val catalog = spark.sessionState.catalog
         val databaseNames = Seq("db1", "`database`")
 
         databaseNames.foreach { dbName =>
@@ -172,7 +172,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
   }
 
   test("Create/Drop Database - location") {
-    val catalog = sqlContext.sessionState.catalog
+    val catalog = spark.sessionState.catalog
     val databaseNames = Seq("db1", "`database`")
     withTempDir { tmpDir =>
       val path = tmpDir.toString
@@ -200,7 +200,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     withTempDir { tmpDir =>
       val path = tmpDir.toString
       withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) {
-        val catalog = sqlContext.sessionState.catalog
+        val catalog = spark.sessionState.catalog
         val databaseNames = Seq("db1", "`database`")
 
         databaseNames.foreach { dbName =>
@@ -231,7 +231,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     withTempDir { tmpDir =>
       val path = tmpDir.toString
       withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) {
-        val catalog = sqlContext.sessionState.catalog
+        val catalog = spark.sessionState.catalog
         val databaseNames = Seq("db1", "`database`")
 
         databaseNames.foreach { dbName =>
@@ -300,7 +300,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
   }
 
   test("drop non-empty database in restrict mode") {
-    val catalog = sqlContext.sessionState.catalog
+    val catalog = spark.sessionState.catalog
     val dbName = "db1"
     sql(s"CREATE DATABASE $dbName")
 
@@ -322,7 +322,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
   }
 
   test("drop non-empty database in cascade mode") {
-    val catalog = sqlContext.sessionState.catalog
+    val catalog = spark.sessionState.catalog
     val dbName = "db1"
     sql(s"CREATE DATABASE $dbName")
 
@@ -441,7 +441,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
         "RENAME TEMPORARY TABLE from '`tab1`' to '`default`.`tab2`': " +
           "cannot specify database name 'default' in the destination table"))
 
-      val catalog = sqlContext.sessionState.catalog
+      val catalog = spark.sessionState.catalog
       assert(catalog.listTables("default") == Seq(TableIdentifier("tab1")))
     }
   }
@@ -476,7 +476,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       assert(e.getMessage.contains(
         "RENAME TEMPORARY TABLE from '`tab1`' to '`tab2`': destination table already exists"))
 
-      val catalog = sqlContext.sessionState.catalog
+      val catalog = spark.sessionState.catalog
       assert(catalog.listTables("default") == Seq(TableIdentifier("tab1"), TableIdentifier("tab2")))
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
index dab5c76200f0cdcad56a90c33d0efdd3726ba712..85c2e8ba5529dff654d0cc2b05e7b723a667f166 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
@@ -32,7 +32,7 @@ class FileCatalogSuite extends SharedSQLContext {
       stringToFile(file, "text")
 
       val path = new Path(file.getCanonicalPath)
-      val catalog = new ListingFileCatalog(sqlContext.sparkSession, Seq(path), Map.empty, None) {
+      val catalog = new ListingFileCatalog(spark, Seq(path), Map.empty, None) {
         def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq
         def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq
       }
@@ -56,11 +56,11 @@ class FileCatalogSuite extends SharedSQLContext {
       require(qualifiedFilePath.toString.startsWith("file:"))
 
       val catalog1 = new ListingFileCatalog(
-        sqlContext.sparkSession, Seq(unqualifiedDirPath), Map.empty, None)
+        spark, Seq(unqualifiedDirPath), Map.empty, None)
       assert(catalog1.allFiles.map(_.getPath) === Seq(qualifiedFilePath))
 
       val catalog2 = new ListingFileCatalog(
-        sqlContext.sparkSession, Seq(unqualifiedFilePath), Map.empty, None)
+        spark, Seq(unqualifiedFilePath), Map.empty, None)
       assert(catalog2.allFiles.map(_.getPath) === Seq(qualifiedFilePath))
 
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index ad7c05c12eb9ab6ab457ac0e64d66c56cc6e3a28..bc95446387956e47aa17004341d943262307fd1d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -137,7 +137,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
   }
 
   test("test inferring decimals") {
-    val result = sqlContext.read
+    val result = spark.read
       .format("csv")
       .option("comment", "~")
       .option("header", "true")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 847ea6bd523d0e8ae9e2776456b24506da1cf088..e19345529e93b27e91d76fc58eaea627d0faba5f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -445,17 +445,17 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
         (1 to 10).map(i => ParquetData(i, i.toString)), path)
 
       // when the input is the base path containing partitioning directories
-      val baseDf = sqlContext.read.parquet(base.getCanonicalPath)
+      val baseDf = spark.read.parquet(base.getCanonicalPath)
       assert(baseDf.schema.map(_.name) === Seq("intField", "stringField", "pi", "ps"))
 
       // when the input is a path to the leaf directory containing a parquet file
-      val partDf = sqlContext.read.parquet(path.getCanonicalPath)
+      val partDf = spark.read.parquet(path.getCanonicalPath)
       assert(partDf.schema.map(_.name) === Seq("intField", "stringField"))
 
       path.listFiles().foreach { f =>
         if (f.getName.toLowerCase().endsWith(".parquet")) {
           // when the input is a path to a parquet file
-          val df = sqlContext.read.parquet(f.getCanonicalPath)
+          val df = spark.read.parquet(f.getCanonicalPath)
           assert(df.schema.map(_.name) === Seq("intField", "stringField"))
         }
       }
@@ -464,7 +464,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
         if (f.getName.toLowerCase().endsWith(".parquet")) {
           // when the input is a path to a parquet file but `basePath` is overridden to
           // the base path containing partitioning directories
-          val df = sqlContext
+          val df = spark
             .read.option("basePath", base.getCanonicalPath)
             .parquet(f.getCanonicalPath)
           assert(df.schema.map(_.name) === Seq("intField", "stringField", "pi", "ps"))
@@ -780,7 +780,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
         .parquet(dir.getCanonicalPath)
 
       def check(path: String, basePath: String, expectedDf: DataFrame): Unit = {
-        val testDf = sqlContext.read
+        val testDf = spark.read
           .option("basePath", basePath)
           .parquet(path)
         checkAnswer(testDf, expectedDf)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index 1753b84ba6af6ca401b7eb131d53318939b7649f..1953d6fa5a7af1774b0edce4f98c8cbd90b3aedc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -90,7 +90,7 @@ private[sql] trait ParquetTest extends SQLTestUtils {
       (data: Seq[T], tableName: String, testVectorized: Boolean = true)
       (f: => Unit): Unit = {
     withParquetDataFrame(data, testVectorized) { df =>
-      spark.sqlContext.registerDataFrameAsTable(df, tableName)
+      df.createOrReplaceTempView(tableName)
       withTempTable(tableName)(f)
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index be4bf5b447565353e5f77db5104148caf4d1fabc..db32b6b6befb720e58b515376497c0055b5e7fe4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -40,7 +40,7 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
   protected var spark: SparkSession = null
 
   /**
-   * Create a new [[SQLContext]] running in local-cluster mode with unsafe and codegen enabled.
+   * Create a new [[SparkSession]] running in local-cluster mode with unsafe and codegen enabled.
    */
   override def beforeAll(): Unit = {
     super.beforeAll()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala
index 8093054b6dd1957cf2038f1ffa03e372ac2424ca..38377164c10e65c7936105caf1a3d8c2afa307ec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.types.{BooleanType, DoubleType, IntegerType, StructT
 
 class ExistenceJoinSuite extends SparkPlanTest with SharedSQLContext {
 
-  private lazy val left = sqlContext.createDataFrame(
+  private lazy val left = spark.createDataFrame(
     sparkContext.parallelize(Seq(
       Row(1, 2.0),
       Row(1, 2.0),
@@ -42,7 +42,7 @@ class ExistenceJoinSuite extends SparkPlanTest with SharedSQLContext {
       Row(6, null)
     )), new StructType().add("a", IntegerType).add("b", DoubleType))
 
-  private lazy val right = sqlContext.createDataFrame(
+  private lazy val right = spark.createDataFrame(
     sparkContext.parallelize(Seq(
       Row(2, 3.0),
       Row(2, 3.0),
@@ -53,7 +53,7 @@ class ExistenceJoinSuite extends SparkPlanTest with SharedSQLContext {
       Row(6, null)
     )), new StructType().add("c", IntegerType).add("d", DoubleType))
 
-  private lazy val rightUniqueKey = sqlContext.createDataFrame(
+  private lazy val rightUniqueKey = spark.createDataFrame(
     sparkContext.parallelize(Seq(
       Row(2, 3.0),
       Row(3, 2.0),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index f8227e3bd6ee8b1ccea0daece0788a84efaed66b..ad5365a35ea5c32e75ac9b02b5c78bce5b47ac82 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -35,7 +35,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
     // Set a conf first.
     spark.conf.set(testKey, testVal)
     // Clear the conf.
-    spark.sqlContext.conf.clear()
+    spark.sessionState.conf.clear()
     // After clear, only overrideConfs used by unit test should be in the SQLConf.
     assert(spark.conf.getAll === TestSQLContext.overrideConfs)
 
@@ -50,11 +50,11 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
     assert(spark.conf.get(testKey, testVal + "_") === testVal)
     assert(spark.conf.getAll.contains(testKey))
 
-    spark.sqlContext.conf.clear()
+    spark.sessionState.conf.clear()
   }
 
   test("parse SQL set commands") {
-    spark.sqlContext.conf.clear()
+    spark.sessionState.conf.clear()
     sql(s"set $testKey=$testVal")
     assert(spark.conf.get(testKey, testVal + "_") === testVal)
     assert(spark.conf.get(testKey, testVal + "_") === testVal)
@@ -72,7 +72,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
     sql(s"set $key=")
     assert(spark.conf.get(key, "0") === "")
 
-    spark.sqlContext.conf.clear()
+    spark.sessionState.conf.clear()
   }
 
   test("set command for display") {
@@ -97,7 +97,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
   }
 
   test("deprecated property") {
-    spark.sqlContext.conf.clear()
+    spark.sessionState.conf.clear()
     val original = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS)
     try {
       sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10")
@@ -108,7 +108,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
   }
 
   test("reset - public conf") {
-    spark.sqlContext.conf.clear()
+    spark.sessionState.conf.clear()
     val original = spark.conf.get(SQLConf.GROUP_BY_ORDINAL)
     try {
       assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL) === true)
@@ -124,7 +124,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
   }
 
   test("reset - internal conf") {
-    spark.sqlContext.conf.clear()
+    spark.sessionState.conf.clear()
     val original = spark.conf.get(SQLConf.NATIVE_VIEW)
     try {
       assert(spark.conf.get(SQLConf.NATIVE_VIEW) === true)
@@ -140,7 +140,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
   }
 
   test("reset - user-defined conf") {
-    spark.sqlContext.conf.clear()
+    spark.sessionState.conf.clear()
     val userDefinedConf = "x.y.z.reset"
     try {
       assert(spark.conf.getOption(userDefinedConf).isEmpty)
@@ -155,7 +155,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
   }
 
   test("invalid conf value") {
-    spark.sqlContext.conf.clear()
+    spark.sessionState.conf.clear()
     val e = intercept[IllegalArgumentException] {
       sql(s"set ${SQLConf.CASE_SENSITIVE.key}=10")
     }
@@ -163,7 +163,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
   }
 
   test("Test SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE's method") {
-    spark.sqlContext.conf.clear()
+    spark.sessionState.conf.clear()
 
     spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "100")
     assert(spark.conf.get(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) === 100)
@@ -191,7 +191,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
       spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-90000000000g")
     }
 
-    spark.sqlContext.conf.clear()
+    spark.sessionState.conf.clear()
   }
 
   test("SparkSession can access configs set in SparkConf") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index 03c18ad009d4935b994fc6d05891a3d6aed8cea1..cbddb0643b26d9ea8b72c1fc12f667861ee12099 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -27,19 +27,19 @@ import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.Utils
 
 class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext with BeforeAndAfter {
-  protected override lazy val sql = caseInsensitiveContext.sql _
+  protected override lazy val sql = spark.sql _
   private var path: File = null
 
   override def beforeAll(): Unit = {
     super.beforeAll()
     path = Utils.createTempDir()
     val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
-    caseInsensitiveContext.read.json(rdd).createOrReplaceTempView("jt")
+    spark.read.json(rdd).createOrReplaceTempView("jt")
   }
 
   override def afterAll(): Unit = {
     try {
-      caseInsensitiveContext.dropTempTable("jt")
+      spark.catalog.dropTempView("jt")
     } finally {
       super.afterAll()
     }
@@ -64,7 +64,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext with
       sql("SELECT a, b FROM jsonTable"),
       sql("SELECT a, b FROM jt").collect())
 
-    caseInsensitiveContext.dropTempTable("jsonTable")
+    spark.catalog.dropTempView("jsonTable")
   }
 
   test("CREATE TEMPORARY TABLE AS SELECT based on the file without write permission") {
@@ -132,7 +132,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext with
       sql("SELECT * FROM jsonTable"),
       sql("SELECT a * 4 FROM jt").collect())
 
-    caseInsensitiveContext.dropTempTable("jsonTable")
+    spark.catalog.dropTempView("jsonTable")
     // Explicitly delete the data.
     if (path.exists()) Utils.deleteRecursively(path)
 
@@ -150,7 +150,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext with
       sql("SELECT * FROM jsonTable"),
       sql("SELECT b FROM jt").collect())
 
-    caseInsensitiveContext.dropTempTable("jsonTable")
+    spark.catalog.dropTempView("jsonTable")
   }
 
   test("CREATE TEMPORARY TABLE AS SELECT with IF NOT EXISTS is not allowed") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
index 853707c036c9a4b2e7d47a05f21f644b04c7244c..f07c33042a72e54b570af8d443209b4c8d848963 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
@@ -27,23 +27,23 @@ class DDLSourceLoadSuite extends DataSourceTest with SharedSQLContext {
 
   test("data sources with the same name") {
     intercept[RuntimeException] {
-      caseInsensitiveContext.read.format("Fluet da Bomb").load()
+      spark.read.format("Fluet da Bomb").load()
     }
   }
 
   test("load data source from format alias") {
-    caseInsensitiveContext.read.format("gathering quorum").load().schema ==
+    spark.read.format("gathering quorum").load().schema ==
       StructType(Seq(StructField("stringType", StringType, nullable = false)))
   }
 
   test("specify full classname with duplicate formats") {
-    caseInsensitiveContext.read.format("org.apache.spark.sql.sources.FakeSourceOne")
+    spark.read.format("org.apache.spark.sql.sources.FakeSourceOne")
       .load().schema == StructType(Seq(StructField("stringType", StringType, nullable = false)))
   }
 
-  test("should fail to load ORC without HiveContext") {
+  test("should fail to load ORC without Hive Support") {
     intercept[ClassNotFoundException] {
-      caseInsensitiveContext.read.format("orc").load()
+      spark.read.format("orc").load()
     }
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
index a34f70ed65b5c0aa01e28158e4ff4ef11c60233e..5a7a9073fb3a25dab53e4b55feff29bc4d11173f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
@@ -78,7 +78,7 @@ case class SimpleDDLScan(
 }
 
 class DDLTestSuite extends DataSourceTest with SharedSQLContext {
-  protected override lazy val sql = caseInsensitiveContext.sql _
+  protected override lazy val sql = spark.sql _
 
   override def beforeAll(): Unit = {
     super.beforeAll()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala
index 754aa32a300b9abe431a95d0515fb5d51094d5e1..206d03ea98e6908a369eaab2fb0268d8e22b50cc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala
@@ -22,16 +22,9 @@ import org.apache.spark.sql.internal.SQLConf
 
 private[sql] abstract class DataSourceTest extends QueryTest {
 
-  // We want to test some edge cases.
-  protected lazy val caseInsensitiveContext: SQLContext = {
-    val ctx = new SQLContext(spark.sparkContext)
-    ctx.setConf(SQLConf.CASE_SENSITIVE, false)
-    ctx
-  }
-
   protected def sqlTest(sqlString: String, expectedAnswer: Seq[Row]) {
     test(sqlString) {
-      checkAnswer(caseInsensitiveContext.sql(sqlString), expectedAnswer)
+      checkAnswer(spark.sql(sqlString), expectedAnswer)
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
index f969660ddd322ae28ce4c250768c9ab5e288f0b9..45e737f5ed04785d667030695208f4843e8ca8c2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
@@ -133,7 +133,7 @@ object ColumnsRequired {
 }
 
 class FilteredScanSuite extends DataSourceTest with SharedSQLContext with PredicateHelper {
-  protected override lazy val sql = caseInsensitiveContext.sql _
+  protected override lazy val sql = spark.sql _
 
   override def beforeAll(): Unit = {
     super.beforeAll()
@@ -310,7 +310,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
 
     test(s"PushDown Returns $expectedCount: $sqlString") {
       // These tests check a particular plan, disable whole stage codegen.
-      caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false)
+      spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, false)
       try {
         val queryExecution = sql(sqlString).queryExecution
         val rawPlan = queryExecution.executedPlan.collect {
@@ -322,7 +322,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
         val rawCount = rawPlan.execute().count()
         assert(ColumnsRequired.set === requiredColumnNames)
 
-        val table = caseInsensitiveContext.table("oneToTenFiltered")
+        val table = spark.table("oneToTenFiltered")
         val relation = table.queryExecution.logical.collectFirst {
           case LogicalRelation(r, _, _) => r
         }.get
@@ -337,7 +337,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
               queryExecution)
         }
       } finally {
-        caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED,
+        spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key,
           SQLConf.WHOLESTAGE_CODEGEN_ENABLED.defaultValue.get)
       }
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 854fec5b22f771ad2d199e2ee4c7b789c875defe..4780eb473d79bd61205b6d1132d389f554887a41 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -24,14 +24,14 @@ import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.Utils
 
 class InsertSuite extends DataSourceTest with SharedSQLContext {
-  protected override lazy val sql = caseInsensitiveContext.sql _
+  protected override lazy val sql = spark.sql _
   private var path: File = null
 
   override def beforeAll(): Unit = {
     super.beforeAll()
     path = Utils.createTempDir()
     val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""))
-    caseInsensitiveContext.read.json(rdd).createOrReplaceTempView("jt")
+    spark.read.json(rdd).createOrReplaceTempView("jt")
     sql(
       s"""
         |CREATE TEMPORARY TABLE jsonTable (a int, b string)
@@ -44,8 +44,8 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
 
   override def afterAll(): Unit = {
     try {
-      caseInsensitiveContext.dropTempTable("jsonTable")
-      caseInsensitiveContext.dropTempTable("jt")
+      spark.catalog.dropTempView("jsonTable")
+      spark.catalog.dropTempView("jt")
       Utils.deleteRecursively(path)
     } finally {
       super.afterAll()
@@ -111,7 +111,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
 
     // Writing the table to less part files.
     val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""), 5)
-    caseInsensitiveContext.read.json(rdd1).createOrReplaceTempView("jt1")
+    spark.read.json(rdd1).createOrReplaceTempView("jt1")
     sql(
       s"""
          |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt1
@@ -123,7 +123,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
 
     // Writing the table to more part files.
     val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""), 10)
-    caseInsensitiveContext.read.json(rdd2).createOrReplaceTempView("jt2")
+    spark.read.json(rdd2).createOrReplaceTempView("jt2")
     sql(
       s"""
          |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt2
@@ -142,8 +142,8 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
       (1 to 10).map(i => Row(i * 10, s"str$i"))
     )
 
-    caseInsensitiveContext.dropTempTable("jt1")
-    caseInsensitiveContext.dropTempTable("jt2")
+    spark.catalog.dropTempView("jt1")
+    spark.catalog.dropTempView("jt2")
   }
 
   test("INSERT INTO JSONRelation for now") {
@@ -185,7 +185,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
          |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt
       """.stripMargin)
     // Cached Query Execution
-    caseInsensitiveContext.cacheTable("jsonTable")
+    spark.catalog.cacheTable("jsonTable")
     assertCached(sql("SELECT * FROM jsonTable"))
     checkAnswer(
       sql("SELECT * FROM jsonTable"),
@@ -226,7 +226,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
 //      sql("SELECT a * 2, b FROM jt").collect())
 //
 //    // Verify uncaching
-//    caseInsensitiveContext.uncacheTable("jsonTable")
+//    spark.catalog.uncacheTable("jsonTable")
 //    assertCached(sql("SELECT * FROM jsonTable"), 0)
   }
 
@@ -257,6 +257,6 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
       "It is not allowed to insert into a table that is not an InsertableRelation."
     )
 
-    caseInsensitiveContext.dropTempTable("oneToTen")
+    spark.catalog.dropTempView("oneToTen")
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
index 9cdf7dea7663e2403a3663fc43dfa13a23664e14..207f89d3eaea0a90a8e9d776b7c30fd5a127da13 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
@@ -56,7 +56,7 @@ case class SimplePrunedScan(from: Int, to: Int)(@transient val sparkSession: Spa
 }
 
 class PrunedScanSuite extends DataSourceTest with SharedSQLContext {
-  protected override lazy val sql = caseInsensitiveContext.sql _
+  protected override lazy val sql = spark.sql _
 
   override def beforeAll(): Unit = {
     super.beforeAll()
@@ -122,7 +122,7 @@ class PrunedScanSuite extends DataSourceTest with SharedSQLContext {
     test(s"Columns output ${expectedColumns.mkString(",")}: $sqlString") {
 
       // These tests check a particular plan, disable whole stage codegen.
-      caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false)
+      spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, false)
       try {
         val queryExecution = sql(sqlString).queryExecution
         val rawPlan = queryExecution.executedPlan.collect {
@@ -145,7 +145,7 @@ class PrunedScanSuite extends DataSourceTest with SharedSQLContext {
           fail(s"Wrong output row. Got $rawOutput\n$queryExecution")
         }
       } finally {
-        caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED,
+        spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key,
           SQLConf.WHOLESTAGE_CODEGEN_ENABLED.defaultValue.get)
       }
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
index 7738e4107df94e5a21aa25711fed610fbe75a799..b1756c27fae0ac3cbfbca90501b26aa276aa38ac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
@@ -28,26 +28,26 @@ import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
 class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndAfter {
-  protected override lazy val sql = caseInsensitiveContext.sql _
+  protected override lazy val sql = spark.sql _
   private var originalDefaultSource: String = null
   private var path: File = null
   private var df: DataFrame = null
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    originalDefaultSource = caseInsensitiveContext.conf.defaultDataSourceName
+    originalDefaultSource = spark.sessionState.conf.defaultDataSourceName
 
     path = Utils.createTempDir()
     path.delete()
 
     val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
-    df = caseInsensitiveContext.read.json(rdd)
+    df = spark.read.json(rdd)
     df.createOrReplaceTempView("jsonTable")
   }
 
   override def afterAll(): Unit = {
     try {
-      caseInsensitiveContext.conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource)
+      spark.conf.set(SQLConf.DEFAULT_DATA_SOURCE_NAME.key, originalDefaultSource)
     } finally {
       super.afterAll()
     }
@@ -58,45 +58,42 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA
   }
 
   def checkLoad(expectedDF: DataFrame = df, tbl: String = "jsonTable"): Unit = {
-    caseInsensitiveContext.conf.setConf(
-      SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
-    checkAnswer(caseInsensitiveContext.read.load(path.toString), expectedDF.collect())
+    spark.conf.set(SQLConf.DEFAULT_DATA_SOURCE_NAME.key, "org.apache.spark.sql.json")
+    checkAnswer(spark.read.load(path.toString), expectedDF.collect())
 
     // Test if we can pick up the data source name passed in load.
-    caseInsensitiveContext.conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
-    checkAnswer(caseInsensitiveContext.read.format("json").load(path.toString),
+    spark.conf.set(SQLConf.DEFAULT_DATA_SOURCE_NAME.key, "not a source name")
+    checkAnswer(spark.read.format("json").load(path.toString),
       expectedDF.collect())
-    checkAnswer(caseInsensitiveContext.read.format("json").load(path.toString),
+    checkAnswer(spark.read.format("json").load(path.toString),
       expectedDF.collect())
     val schema = StructType(StructField("b", StringType, true) :: Nil)
     checkAnswer(
-      caseInsensitiveContext.read.format("json").schema(schema).load(path.toString),
+      spark.read.format("json").schema(schema).load(path.toString),
       sql(s"SELECT b FROM $tbl").collect())
   }
 
   test("save with path and load") {
-    caseInsensitiveContext.conf.setConf(
-      SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
+    spark.conf.set(SQLConf.DEFAULT_DATA_SOURCE_NAME.key, "org.apache.spark.sql.json")
     df.write.save(path.toString)
     checkLoad()
   }
 
   test("save with string mode and path, and load") {
-    caseInsensitiveContext.conf.setConf(
-      SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
+    spark.conf.set(SQLConf.DEFAULT_DATA_SOURCE_NAME.key, "org.apache.spark.sql.json")
     path.createNewFile()
     df.write.mode("overwrite").save(path.toString)
     checkLoad()
   }
 
   test("save with path and datasource, and load") {
-    caseInsensitiveContext.conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
+    spark.conf.set(SQLConf.DEFAULT_DATA_SOURCE_NAME.key, "not a source name")
     df.write.json(path.toString)
     checkLoad()
   }
 
   test("save with data source and options, and load") {
-    caseInsensitiveContext.conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
+    spark.conf.set(SQLConf.DEFAULT_DATA_SOURCE_NAME.key, "not a source name")
     df.write.mode(SaveMode.ErrorIfExists).json(path.toString)
     checkLoad()
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
index cddf4a1884fa8063935e4e35146c28f00bd5ab58..93116d84ced7127260ffae06bda91a9edd365c05 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
@@ -106,7 +106,7 @@ case class AllDataTypesScan(
 }
 
 class TableScanSuite extends DataSourceTest with SharedSQLContext {
-  protected override lazy val sql = caseInsensitiveContext.sql _
+  protected override lazy val sql = spark.sql _
 
   private lazy val tableWithSchemaExpected = (1 to 10).map { i =>
     Row(
@@ -241,7 +241,7 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext {
       Nil
     )
 
-    assert(expectedSchema == caseInsensitiveContext.table("tableWithSchema").schema)
+    assert(expectedSchema == spark.table("tableWithSchema").schema)
 
     checkAnswer(
       sql(
@@ -297,7 +297,7 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext {
 
   test("Caching")  {
     // Cached Query Execution
-    caseInsensitiveContext.cacheTable("oneToTen")
+    spark.catalog.cacheTable("oneToTen")
     assertCached(sql("SELECT * FROM oneToTen"))
     checkAnswer(
       sql("SELECT * FROM oneToTen"),
@@ -325,7 +325,7 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext {
       (2 to 10).map(i => Row(i, i - 1)).toSeq)
 
     // Verify uncaching
-    caseInsensitiveContext.uncacheTable("oneToTen")
+    spark.catalog.uncacheTable("oneToTen")
     assertCached(sql("SELECT * FROM oneToTen"), 0)
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index f3262f772b1603be6895572c7fdb99816e34b78b..3d8dcaf5a53220ca111ec8eb9ef718d6effac9a3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -173,7 +173,7 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
         query.processAllAvailable()
       }
 
-      val outputDf = sqlContext.read.parquet(outputDir)
+      val outputDf = spark.read.parquet(outputDir)
       val expectedSchema = new StructType()
         .add(StructField("value", IntegerType))
         .add(StructField("id", IntegerType))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 4ca561960322024bc0dbed02a60b9786867c3c1b..52ba90f02c945d7f70151be050587302bbe2c6b4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -97,18 +97,18 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton {
 
   test("correct error on uncache of non-cached table") {
     intercept[IllegalArgumentException] {
-      hiveContext.uncacheTable("src")
+      spark.catalog.uncacheTable("src")
     }
   }
 
   test("'CACHE TABLE' and 'UNCACHE TABLE' HiveQL statement") {
     sql("CACHE TABLE src")
     assertCached(table("src"))
-    assert(hiveContext.isCached("src"), "Table 'src' should be cached")
+    assert(spark.catalog.isCached("src"), "Table 'src' should be cached")
 
     sql("UNCACHE TABLE src")
     assertCached(table("src"), 0)
-    assert(!hiveContext.isCached("src"), "Table 'src' should not be cached")
+    assert(!spark.catalog.isCached("src"), "Table 'src' should not be cached")
   }
 
   test("CACHE TABLE tableName AS SELECT * FROM anotherTable") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
index dc8f374eb178f36e6b5ab277cc4b3e83b5206866..aa1973de7f6783940063ebeea079c029d402fd6c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
@@ -26,11 +26,11 @@ import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 
 class ErrorPositionSuite extends QueryTest with TestHiveSingleton with BeforeAndAfterEach {
-  import hiveContext.implicits._
+  import spark.implicits._
 
   override protected def beforeEach(): Unit = {
     super.beforeEach()
-    if (spark.sqlContext.tableNames().contains("src")) {
+    if (spark.catalog.listTables().collect().map(_.name).contains("src")) {
       spark.catalog.dropTempView("src")
     }
     Seq((1, "")).toDF("key", "value").createOrReplaceTempView("src")
@@ -130,12 +130,12 @@ class ErrorPositionSuite extends QueryTest with TestHiveSingleton with BeforeAnd
    * @param token a unique token in the string that should be indicated by the exception
    */
   def positionTest(name: String, query: String, token: String): Unit = {
-    def ast = hiveContext.sessionState.sqlParser.parsePlan(query)
+    def ast = spark.sessionState.sqlParser.parsePlan(query)
     def parseTree = Try(quietly(ast.treeString)).getOrElse("<failed to parse>")
 
     test(name) {
       val error = intercept[AnalysisException] {
-        quietly(hiveContext.sql(query))
+        quietly(spark.sql(query))
       }
 
       assert(!error.getMessage.contains("Seq("))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala
index cc41c04c71e16235401c182b2ddcda19fa8c9c74..6477974fe713abe5feb723207bbe48ffba50a0e2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala
@@ -27,20 +27,20 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton
 // `hive` package is optional in compiling, however, `SQLContext.sql` doesn't
 // support the `cube` or `rollup` yet.
 class HiveDataFrameAnalyticsSuite extends QueryTest with TestHiveSingleton with BeforeAndAfterAll {
-  import hiveContext.implicits._
-  import hiveContext.sql
+  import spark.implicits._
+  import spark.sql
 
   private var testData: DataFrame = _
 
   override def beforeAll() {
     super.beforeAll()
     testData = Seq((1, 2), (2, 2), (3, 4)).toDF("a", "b")
-    hiveContext.registerDataFrameAsTable(testData, "mytable")
+    testData.createOrReplaceTempView("mytable")
   }
 
   override def afterAll(): Unit = {
     try {
-      hiveContext.dropTempTable("mytable")
+      spark.catalog.dropTempView("mytable")
     } finally {
       super.afterAll()
     }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameJoinSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameJoinSuite.scala
index 63cf5030ab8b60ea83f44a98941782a7179e9bba..cdc259d75b139f5848204cc0e5069572c383929d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameJoinSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameJoinSuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.{QueryTest, Row}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 
 class HiveDataFrameJoinSuite extends QueryTest with TestHiveSingleton {
-  import hiveContext.implicits._
+  import spark.implicits._
 
   // We should move this into SQL package if we make case sensitivity configurable in SQL.
   test("join - self join auto resolve ambiguity with case insensitivity") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
index 7fdc5d71937ff8abca966ed20c06e2947380cc44..1b31caa76d3a53e5c6ee6da20182d0527ea1992b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
@@ -23,10 +23,10 @@ import org.apache.spark.sql.QueryTest
 class HiveDataFrameSuite extends QueryTest with TestHiveSingleton {
   test("table name with schema") {
     // regression test for SPARK-11778
-    hiveContext.sql("create schema usrdb")
-    hiveContext.sql("create table usrdb.test(c int)")
-    hiveContext.read.table("usrdb.test")
-    hiveContext.sql("drop table usrdb.test")
-    hiveContext.sql("drop schema usrdb")
+    spark.sql("create schema usrdb")
+    spark.sql("create table usrdb.test(c int)")
+    spark.read.table("usrdb.test")
+    spark.sql("drop table usrdb.test")
+    spark.sql("drop schema usrdb")
   }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index b043d291aa65f7d224822cf5c39f93b62faddfb6..b420781e51bd365b201e8fa310f90c754aa616ae 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils}
 import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
 
 class HiveMetastoreCatalogSuite extends TestHiveSingleton {
-  import hiveContext.implicits._
+  import spark.implicits._
 
   test("struct field should accept underscore in sub-column name") {
     val hiveTypeStr = "struct<a: int, b_1: string, c: string>"
@@ -45,7 +45,7 @@ class HiveMetastoreCatalogSuite extends TestHiveSingleton {
   }
 
   test("duplicated metastore relations") {
-    val df = hiveContext.sql("SELECT * FROM src")
+    val df = spark.sql("SELECT * FROM src")
     logInfo(df.queryExecution.toString)
     df.as('a).join(df.as('b), $"a.key" === $"b.key")
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
index e2304b5397ca1cee8fe36561d81e6e50611ebac6..33252ad07add9226fb944a57753cfc94ff000d41 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
@@ -51,7 +51,7 @@ class HiveParquetSuite extends QueryTest with ParquetTest with TestHiveSingleton
   test("Converting Hive to Parquet Table via saveAsParquetFile") {
     withTempPath { dir =>
       sql("SELECT * FROM src").write.parquet(dir.getCanonicalPath)
-      hiveContext.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("p")
+      spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("p")
       withTempTable("p") {
         checkAnswer(
           sql("SELECT * FROM src ORDER BY key"),
@@ -65,7 +65,7 @@ class HiveParquetSuite extends QueryTest with ParquetTest with TestHiveSingleton
     withParquetTable((1 to 10).map(i => (i, s"val_$i")), "t", false) {
       withTempPath { file =>
         sql("SELECT * FROM t LIMIT 1").write.parquet(file.getCanonicalPath)
-        hiveContext.read.parquet(file.getCanonicalPath).createOrReplaceTempView("p")
+        spark.read.parquet(file.getCanonicalPath).createOrReplaceTempView("p")
         withTempTable("p") {
           // let's do three overwrites for good measure
           sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 4a55bcc3b19d0d136b65fa455efdbf85c7ef2116..fae59001b98e13f87d1e8a85c3193de64d7bd97e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -36,9 +36,9 @@ case class ThreeCloumntable(key: Int, value: String, key1: String)
 
 class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
     with SQLTestUtils {
-  import hiveContext.implicits._
+  import spark.implicits._
 
-  override lazy val testData = hiveContext.sparkContext.parallelize(
+  override lazy val testData = spark.sparkContext.parallelize(
     (1 to 100).map(i => TestData(i, i.toString))).toDF()
 
   before {
@@ -95,9 +95,9 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
 
   test("SPARK-4052: scala.collection.Map as value type of MapType") {
     val schema = StructType(StructField("m", MapType(StringType, StringType), true) :: Nil)
-    val rowRDD = hiveContext.sparkContext.parallelize(
+    val rowRDD = spark.sparkContext.parallelize(
       (1 to 100).map(i => Row(scala.collection.mutable.HashMap(s"key$i" -> s"value$i"))))
-    val df = hiveContext.createDataFrame(rowRDD, schema)
+    val df = spark.createDataFrame(rowRDD, schema)
     df.createOrReplaceTempView("tableWithMapValue")
     sql("CREATE TABLE hiveTableWithMapValue(m MAP <STRING, STRING>)")
     sql("INSERT OVERWRITE TABLE hiveTableWithMapValue SELECT m FROM tableWithMapValue")
@@ -169,8 +169,8 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
   test("Insert ArrayType.containsNull == false") {
     val schema = StructType(Seq(
       StructField("a", ArrayType(StringType, containsNull = false))))
-    val rowRDD = hiveContext.sparkContext.parallelize((1 to 100).map(i => Row(Seq(s"value$i"))))
-    val df = hiveContext.createDataFrame(rowRDD, schema)
+    val rowRDD = spark.sparkContext.parallelize((1 to 100).map(i => Row(Seq(s"value$i"))))
+    val df = spark.createDataFrame(rowRDD, schema)
     df.createOrReplaceTempView("tableWithArrayValue")
     sql("CREATE TABLE hiveTableWithArrayValue(a Array <STRING>)")
     sql("INSERT OVERWRITE TABLE hiveTableWithArrayValue SELECT a FROM tableWithArrayValue")
@@ -185,9 +185,9 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
   test("Insert MapType.valueContainsNull == false") {
     val schema = StructType(Seq(
       StructField("m", MapType(StringType, StringType, valueContainsNull = false))))
-    val rowRDD = hiveContext.sparkContext.parallelize(
+    val rowRDD = spark.sparkContext.parallelize(
       (1 to 100).map(i => Row(Map(s"key$i" -> s"value$i"))))
-    val df = hiveContext.createDataFrame(rowRDD, schema)
+    val df = spark.createDataFrame(rowRDD, schema)
     df.createOrReplaceTempView("tableWithMapValue")
     sql("CREATE TABLE hiveTableWithMapValue(m Map <STRING, STRING>)")
     sql("INSERT OVERWRITE TABLE hiveTableWithMapValue SELECT m FROM tableWithMapValue")
@@ -202,9 +202,9 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
   test("Insert StructType.fields.exists(_.nullable == false)") {
     val schema = StructType(Seq(
       StructField("s", StructType(Seq(StructField("f", StringType, nullable = false))))))
-    val rowRDD = hiveContext.sparkContext.parallelize(
+    val rowRDD = spark.sparkContext.parallelize(
       (1 to 100).map(i => Row(Row(s"value$i"))))
-    val df = hiveContext.createDataFrame(rowRDD, schema)
+    val df = spark.createDataFrame(rowRDD, schema)
     df.createOrReplaceTempView("tableWithStructValue")
     sql("CREATE TABLE hiveTableWithStructValue(s Struct <f: STRING>)")
     sql("INSERT OVERWRITE TABLE hiveTableWithStructValue SELECT s FROM tableWithStructValue")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 153b0c3c72a785f7d04c7c2c7edda70231140682..1e6de463b3eba14064c6564e38711eb3ab3bfa92 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -40,7 +40,7 @@ import org.apache.spark.util.Utils
  */
 class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
   import hiveContext._
-  import hiveContext.implicits._
+  import spark.implicits._
 
   var jsonFilePath: String = _
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
index 5b706b0432418e006b10c160289a6eb8a80eaff6..83f1b192f7c94bdab11be25c75e316700680e4f7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
@@ -25,22 +25,29 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
   private lazy val df = spark.range(10).coalesce(1).toDF()
 
   private def checkTablePath(dbName: String, tableName: String): Unit = {
-    val metastoreTable = hiveContext.sharedState.externalCatalog.getTable(dbName, tableName)
+    val metastoreTable = spark.sharedState.externalCatalog.getTable(dbName, tableName)
     val expectedPath =
-      hiveContext.sharedState.externalCatalog.getDatabase(dbName).locationUri + "/" + tableName
+      spark.sharedState.externalCatalog.getDatabase(dbName).locationUri + "/" + tableName
 
     assert(metastoreTable.storage.serdeProperties("path") === expectedPath)
   }
 
+  private def getTableNames(dbName: Option[String] = None): Array[String] = {
+    dbName match {
+      case Some(db) => spark.catalog.listTables(db).collect().map(_.name)
+      case None => spark.catalog.listTables().collect().map(_.name)
+    }
+  }
+
   test(s"saveAsTable() to non-default database - with USE - Overwrite") {
     withTempDatabase { db =>
       activateDatabase(db) {
         df.write.mode(SaveMode.Overwrite).saveAsTable("t")
-        assert(spark.sqlContext.tableNames().contains("t"))
+        assert(getTableNames().contains("t"))
         checkAnswer(spark.table("t"), df)
       }
 
-      assert(spark.sqlContext.tableNames(db).contains("t"))
+      assert(getTableNames(Option(db)).contains("t"))
       checkAnswer(spark.table(s"$db.t"), df)
 
       checkTablePath(db, "t")
@@ -50,7 +57,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
   test(s"saveAsTable() to non-default database - without USE - Overwrite") {
     withTempDatabase { db =>
       df.write.mode(SaveMode.Overwrite).saveAsTable(s"$db.t")
-      assert(spark.sqlContext.tableNames(db).contains("t"))
+      assert(getTableNames(Option(db)).contains("t"))
       checkAnswer(spark.table(s"$db.t"), df)
 
       checkTablePath(db, "t")
@@ -65,7 +72,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
           df.write.format("parquet").mode(SaveMode.Overwrite).save(path)
 
           spark.catalog.createExternalTable("t", path, "parquet")
-          assert(spark.sqlContext.tableNames(db).contains("t"))
+          assert(getTableNames(Option(db)).contains("t"))
           checkAnswer(spark.table("t"), df)
 
           sql(
@@ -76,7 +83,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
               |  path '$path'
               |)
             """.stripMargin)
-          assert(spark.sqlContext.tableNames(db).contains("t1"))
+          assert(getTableNames(Option(db)).contains("t1"))
           checkAnswer(spark.table("t1"), df)
         }
       }
@@ -90,7 +97,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
         df.write.format("parquet").mode(SaveMode.Overwrite).save(path)
         spark.catalog.createExternalTable(s"$db.t", path, "parquet")
 
-        assert(spark.sqlContext.tableNames(db).contains("t"))
+        assert(getTableNames(Option(db)).contains("t"))
         checkAnswer(spark.table(s"$db.t"), df)
 
         sql(
@@ -101,7 +108,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
               |  path '$path'
               |)
             """.stripMargin)
-        assert(spark.sqlContext.tableNames(db).contains("t1"))
+        assert(getTableNames(Option(db)).contains("t1"))
         checkAnswer(spark.table(s"$db.t1"), df)
       }
     }
@@ -112,11 +119,11 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
       activateDatabase(db) {
         df.write.mode(SaveMode.Overwrite).saveAsTable("t")
         df.write.mode(SaveMode.Append).saveAsTable("t")
-        assert(spark.sqlContext.tableNames().contains("t"))
+        assert(getTableNames().contains("t"))
         checkAnswer(spark.table("t"), df.union(df))
       }
 
-      assert(spark.sqlContext.tableNames(db).contains("t"))
+      assert(getTableNames(Option(db)).contains("t"))
       checkAnswer(spark.table(s"$db.t"), df.union(df))
 
       checkTablePath(db, "t")
@@ -127,7 +134,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
     withTempDatabase { db =>
       df.write.mode(SaveMode.Overwrite).saveAsTable(s"$db.t")
       df.write.mode(SaveMode.Append).saveAsTable(s"$db.t")
-      assert(spark.sqlContext.tableNames(db).contains("t"))
+      assert(getTableNames(Option(db)).contains("t"))
       checkAnswer(spark.table(s"$db.t"), df.union(df))
 
       checkTablePath(db, "t")
@@ -138,7 +145,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
     withTempDatabase { db =>
       activateDatabase(db) {
         df.write.mode(SaveMode.Overwrite).saveAsTable("t")
-        assert(spark.sqlContext.tableNames().contains("t"))
+        assert(getTableNames().contains("t"))
 
         df.write.insertInto(s"$db.t")
         checkAnswer(spark.table(s"$db.t"), df.union(df))
@@ -150,10 +157,10 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
     withTempDatabase { db =>
       activateDatabase(db) {
         df.write.mode(SaveMode.Overwrite).saveAsTable("t")
-        assert(spark.sqlContext.tableNames().contains("t"))
+        assert(getTableNames().contains("t"))
       }
 
-      assert(spark.sqlContext.tableNames(db).contains("t"))
+      assert(getTableNames(Option(db)).contains("t"))
 
       df.write.insertInto(s"$db.t")
       checkAnswer(spark.table(s"$db.t"), df.union(df))
@@ -175,21 +182,21 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
     withTempDatabase { db =>
       activateDatabase(db) {
         sql(s"CREATE TABLE t (key INT)")
-        assert(spark.sqlContext.tableNames().contains("t"))
-        assert(!spark.sqlContext.tableNames("default").contains("t"))
+        assert(getTableNames().contains("t"))
+        assert(!getTableNames(Option("default")).contains("t"))
       }
 
-      assert(!spark.sqlContext.tableNames().contains("t"))
-      assert(spark.sqlContext.tableNames(db).contains("t"))
+      assert(!getTableNames().contains("t"))
+      assert(getTableNames(Option(db)).contains("t"))
 
       activateDatabase(db) {
         sql(s"DROP TABLE t")
-        assert(!spark.sqlContext.tableNames().contains("t"))
-        assert(!spark.sqlContext.tableNames("default").contains("t"))
+        assert(!getTableNames().contains("t"))
+        assert(!getTableNames(Option("default")).contains("t"))
       }
 
-      assert(!spark.sqlContext.tableNames().contains("t"))
-      assert(!spark.sqlContext.tableNames(db).contains("t"))
+      assert(!getTableNames().contains("t"))
+      assert(!getTableNames(Option(db)).contains("t"))
     }
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
index 266fdd6c1f19ff633b4c723168e2fff13267eb8c..f7650e001a8b953251a15b18bd5fd00e60a9b4bb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.util.Utils
 
 class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
-  import hiveContext.implicits._
+  import spark.implicits._
 
   test("SPARK-5068: query data when path doesn't exist") {
     withSQLConf((SQLConf.HIVE_VERIFY_PARTITION_PATH.key, "true")) {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 73b1a7850d6fb2ee821675b0adc263956017f268..666a8da0dab854f5e6cf7fe98b779d334b91cd3c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -30,11 +30,10 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
 
 class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
-  import hiveContext.sql
 
   test("parse analyze commands") {
     def assertAnalyzeCommand(analyzeCommand: String, c: Class[_]) {
-      val parsed = hiveContext.sessionState.sqlParser.parsePlan(analyzeCommand)
+      val parsed = spark.sessionState.sqlParser.parsePlan(analyzeCommand)
       val operators = parsed.collect {
         case a: AnalyzeTableCommand => a
         case o => o
@@ -72,7 +71,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
   }
 
   test("MetastoreRelations fallback to HDFS for size estimation") {
-    val enableFallBackToHdfsForStats = hiveContext.conf.fallBackToHdfsForStatsEnabled
+    val enableFallBackToHdfsForStats = spark.sessionState.conf.fallBackToHdfsForStatsEnabled
     try {
       withTempDir { tempDir =>
 
@@ -98,9 +97,9 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
             LOCATION '$tempDir'
           """)
 
-        hiveContext.setConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS, true)
+        spark.conf.set(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key, true)
 
-        val relation = hiveContext.sessionState.catalog.lookupRelation(TableIdentifier("csv_table"))
+        val relation = spark.sessionState.catalog.lookupRelation(TableIdentifier("csv_table"))
           .asInstanceOf[MetastoreRelation]
 
         val properties = relation.hiveQlTable.getParameters
@@ -111,15 +110,14 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
         assert(sizeInBytes === BigInt(file1.length() + file2.length()))
       }
     } finally {
-      hiveContext.setConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS, enableFallBackToHdfsForStats)
+      spark.conf.set(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key, enableFallBackToHdfsForStats)
       sql("DROP TABLE csv_table ")
     }
   }
 
   ignore("analyze MetastoreRelations") {
     def queryTotalSize(tableName: String): BigInt =
-      hiveContext.sessionState.catalog.lookupRelation(
-        TableIdentifier(tableName)).statistics.sizeInBytes
+      spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes
 
     // Non-partitioned table
     sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect()
@@ -153,7 +151,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
         |SELECT * FROM src
       """.stripMargin).collect()
 
-    assert(queryTotalSize("analyzeTable_part") === hiveContext.conf.defaultSizeInBytes)
+    assert(queryTotalSize("analyzeTable_part") === spark.sessionState.conf.defaultSizeInBytes)
 
     sql("ANALYZE TABLE analyzeTable_part COMPUTE STATISTICS noscan")
 
@@ -165,9 +163,9 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
     // Try to analyze a temp table
     sql("""SELECT * FROM src""").createOrReplaceTempView("tempTable")
     intercept[UnsupportedOperationException] {
-      hiveContext.sql("ANALYZE TABLE tempTable COMPUTE STATISTICS")
+      sql("ANALYZE TABLE tempTable COMPUTE STATISTICS")
     }
-    hiveContext.sessionState.catalog.dropTable(
+    spark.sessionState.catalog.dropTable(
       TableIdentifier("tempTable"), ignoreIfNotExists = true)
   }
 
@@ -196,8 +194,8 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
       val sizes = df.queryExecution.analyzed.collect {
         case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes
       }
-      assert(sizes.size === 2 && sizes(0) <= hiveContext.conf.autoBroadcastJoinThreshold
-        && sizes(1) <= hiveContext.conf.autoBroadcastJoinThreshold,
+      assert(sizes.size === 2 && sizes(0) <= spark.sessionState.conf.autoBroadcastJoinThreshold
+        && sizes(1) <= spark.sessionState.conf.autoBroadcastJoinThreshold,
         s"query should contain two relations, each of which has size smaller than autoConvertSize")
 
       // Using `sparkPlan` because for relevant patterns in HashJoin to be
@@ -208,8 +206,8 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
 
       checkAnswer(df, expectedAnswer) // check correctness of output
 
-      hiveContext.conf.settings.synchronized {
-        val tmp = hiveContext.conf.autoBroadcastJoinThreshold
+      spark.sessionState.conf.settings.synchronized {
+        val tmp = spark.sessionState.conf.autoBroadcastJoinThreshold
 
         sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=-1""")
         df = sql(query)
@@ -252,8 +250,8 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
         .isAssignableFrom(r.getClass) =>
         r.statistics.sizeInBytes
     }
-    assert(sizes.size === 2 && sizes(1) <= hiveContext.conf.autoBroadcastJoinThreshold
-      && sizes(0) <= hiveContext.conf.autoBroadcastJoinThreshold,
+    assert(sizes.size === 2 && sizes(1) <= spark.sessionState.conf.autoBroadcastJoinThreshold
+      && sizes(0) <= spark.sessionState.conf.autoBroadcastJoinThreshold,
       s"query should contain two relations, each of which has size smaller than autoConvertSize")
 
     // Using `sparkPlan` because for relevant patterns in HashJoin to be
@@ -266,8 +264,8 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
 
     checkAnswer(df, answer) // check correctness of output
 
-    hiveContext.conf.settings.synchronized {
-      val tmp = hiveContext.conf.autoBroadcastJoinThreshold
+    spark.sessionState.conf.settings.synchronized {
+      val tmp = spark.sessionState.conf.autoBroadcastJoinThreshold
 
       sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=-1")
       df = sql(leftSemiJoinQuery)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala
index d121bcbe15b352b635aeb65cdfca6524c4d389d1..88cc42efd0fe310f6493290b8903a4e4e29b7d59 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala
@@ -36,7 +36,7 @@ class UDFSuite
   with TestHiveSingleton
   with BeforeAndAfterEach {
 
-  import hiveContext.implicits._
+  import spark.implicits._
 
   private[this] val functionName = "myUPper"
   private[this] val functionNameUpper = "MYUPPER"
@@ -64,12 +64,12 @@ class UDFSuite
   }
 
   test("UDF case insensitive") {
-    hiveContext.udf.register("random0", () => { Math.random() })
-    hiveContext.udf.register("RANDOM1", () => { Math.random() })
-    hiveContext.udf.register("strlenScala", (_: String).length + (_: Int))
-    assert(hiveContext.sql("SELECT RANDOM0() FROM src LIMIT 1").head().getDouble(0) >= 0.0)
-    assert(hiveContext.sql("SELECT RANDOm1() FROM src LIMIT 1").head().getDouble(0) >= 0.0)
-    assert(hiveContext.sql("SELECT strlenscala('test', 1) FROM src LIMIT 1").head().getInt(0) === 5)
+    spark.udf.register("random0", () => { Math.random() })
+    spark.udf.register("RANDOM1", () => { Math.random() })
+    spark.udf.register("strlenScala", (_: String).length + (_: Int))
+    assert(sql("SELECT RANDOM0() FROM src LIMIT 1").head().getDouble(0) >= 0.0)
+    assert(sql("SELECT RANDOm1() FROM src LIMIT 1").head().getDouble(0) >= 0.0)
+    assert(sql("SELECT strlenscala('test', 1) FROM src LIMIT 1").head().getInt(0) === 5)
   }
 
   test("temporary function: create and drop") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 80e6f4ec702201480a562a214e4ce1eb62694622..a98d46988073624e51a6a8ee8ed84a642b9917ba 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.test.SQLTestUtils
 
 class HiveDDLSuite
   extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
-  import hiveContext.implicits._
+  import spark.implicits._
 
   override def afterEach(): Unit = {
     try {
@@ -52,7 +52,7 @@ class HiveDDLSuite
         new Path(new Path(dbPath.get), tableIdentifier.table).toString
       }
     val filesystemPath = new Path(expectedTablePath)
-    val fs = filesystemPath.getFileSystem(hiveContext.sessionState.newHadoopConf())
+    val fs = filesystemPath.getFileSystem(spark.sessionState.newHadoopConf())
     fs.exists(filesystemPath)
   }
 
@@ -86,8 +86,7 @@ class HiveDDLSuite
           """.stripMargin)
 
         val hiveTable =
-          hiveContext.sessionState.catalog
-            .getTableMetadata(TableIdentifier(tabName, Some("default")))
+          spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName, Some("default")))
         assert(hiveTable.tableType == CatalogTableType.EXTERNAL)
 
         assert(tmpDir.listFiles.nonEmpty)
@@ -113,8 +112,7 @@ class HiveDDLSuite
         }
 
         val hiveTable =
-          hiveContext.sessionState.catalog
-            .getTableMetadata(TableIdentifier(tabName, Some("default")))
+          spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName, Some("default")))
         // This data source table is external table
         assert(hiveTable.tableType == CatalogTableType.EXTERNAL)
 
@@ -127,7 +125,7 @@ class HiveDDLSuite
   }
 
   test("create table and view with comment") {
-    val catalog = hiveContext.sessionState.catalog
+    val catalog = spark.sessionState.catalog
     val tabName = "tab1"
     withTable(tabName) {
       sql(s"CREATE TABLE $tabName(c1 int) COMMENT 'BLABLA'")
@@ -143,7 +141,7 @@ class HiveDDLSuite
   }
 
   test("add/drop partitions - external table") {
-    val catalog = hiveContext.sessionState.catalog
+    val catalog = spark.sessionState.catalog
     withTempDir { tmpDir =>
       val basePath = tmpDir.getCanonicalPath
       val partitionPath_1stCol_part1 = new File(basePath + "/ds=2008-04-08")
@@ -242,7 +240,7 @@ class HiveDDLSuite
       val oldViewName = "view1"
       val newViewName = "view2"
       withView(oldViewName, newViewName) {
-        val catalog = hiveContext.sessionState.catalog
+        val catalog = spark.sessionState.catalog
         sql(s"CREATE VIEW $oldViewName AS SELECT * FROM $tabName")
 
         assert(catalog.tableExists(TableIdentifier(oldViewName)))
@@ -260,7 +258,7 @@ class HiveDDLSuite
       spark.range(10).write.saveAsTable(tabName)
       val viewName = "view1"
       withView(viewName) {
-        val catalog = hiveContext.sessionState.catalog
+        val catalog = spark.sessionState.catalog
         sql(s"CREATE VIEW $viewName AS SELECT * FROM $tabName")
 
         assert(catalog.getTableMetadata(TableIdentifier(viewName))
@@ -299,7 +297,7 @@ class HiveDDLSuite
       val oldViewName = "view1"
       val newViewName = "view2"
       withView(oldViewName, newViewName) {
-        val catalog = hiveContext.sessionState.catalog
+        val catalog = spark.sessionState.catalog
         sql(s"CREATE VIEW $oldViewName AS SELECT * FROM $tabName")
 
         assert(catalog.tableExists(TableIdentifier(tabName)))
@@ -391,7 +389,7 @@ class HiveDDLSuite
     val catalog = spark.sessionState.catalog
     val dbName = "db1"
     val tabName = "tab1"
-    val fs = new Path(tmpDir.toString).getFileSystem(hiveContext.sessionState.newHadoopConf())
+    val fs = new Path(tmpDir.toString).getFileSystem(spark.sessionState.newHadoopConf())
     withTable(tabName) {
       if (dirExists) {
         assert(tmpDir.listFiles.isEmpty)
@@ -441,7 +439,7 @@ class HiveDDLSuite
       val path = tmpDir.toString
       withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) {
         val dbName = "db1"
-        val fs = new Path(path).getFileSystem(hiveContext.sessionState.newHadoopConf())
+        val fs = new Path(path).getFileSystem(spark.sessionState.newHadoopConf())
         val dbPath = new Path(path)
         // the database directory does not exist
         assert(!fs.exists(dbPath))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
index 131b06aec8c9c9e0128e602a93eb4375cc0a23dd..0d08f7edc8ea86a061a83162245f7cb66ec08c29 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
@@ -80,7 +80,7 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
   test("SPARK-6212: The EXPLAIN output of CTAS only shows the analyzed plan") {
     withTempTable("jt") {
       val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""))
-      hiveContext.read.json(rdd).createOrReplaceTempView("jt")
+      spark.read.json(rdd).createOrReplaceTempView("jt")
       val outputs = sql(
         s"""
            |EXPLAIN EXTENDED
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveOperatorQueryableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveOperatorQueryableSuite.scala
index 4d2f190b8e6f127ea1636496193c97caa2ce12c7..0e89e990e564ef2e00a7a6e0533b6ce9448aefc4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveOperatorQueryableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveOperatorQueryableSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton
  * A set of tests that validates commands can also be queried by like a table
  */
 class HiveOperatorQueryableSuite extends QueryTest with TestHiveSingleton {
-  import hiveContext._
+  import spark._
 
   test("SPARK-5324 query result of describe command") {
     hiveContext.loadTestTable("src")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
index 78c0d1f97e7dab0994a957789ca84fac8519472c..89e6edb6b15776f1c6655a8bef5f06ddff847cdd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
@@ -24,8 +24,8 @@ import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 
 class HivePlanTest extends QueryTest with TestHiveSingleton {
-  import hiveContext.sql
-  import hiveContext.implicits._
+  import spark.sql
+  import spark.implicits._
 
   test("udf constant folding") {
     Seq.empty[Tuple1[Int]].toDF("a").createOrReplaceTempView("t")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index 23b7f6c75b48694a099b3c5db91b7d97582e7b13..ffeed63695dfb29b65ec978d62598c850034e952 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -47,8 +47,8 @@ case class ListStringCaseClass(l: Seq[String])
  */
 class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
 
-  import hiveContext.udf
-  import hiveContext.implicits._
+  import spark.udf
+  import spark.implicits._
 
   test("spark sql udf test that returns a struct") {
     udf.register("getStruct", (_: Int) => Fields(1, 2, 3, 4, 5))
@@ -151,7 +151,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
    }
 
   test("UDFIntegerToString") {
-    val testData = hiveContext.sparkContext.parallelize(
+    val testData = spark.sparkContext.parallelize(
       IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil).toDF()
     testData.createOrReplaceTempView("integerTable")
 
@@ -166,7 +166,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
   }
 
   test("UDFToListString") {
-    val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF()
+    val testData = spark.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF()
     testData.createOrReplaceTempView("inputTable")
 
     sql(s"CREATE TEMPORARY FUNCTION testUDFToListString AS '${classOf[UDFToListString].getName}'")
@@ -181,7 +181,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
   }
 
   test("UDFToListInt") {
-    val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF()
+    val testData = spark.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF()
     testData.createOrReplaceTempView("inputTable")
 
     sql(s"CREATE TEMPORARY FUNCTION testUDFToListInt AS '${classOf[UDFToListInt].getName}'")
@@ -196,7 +196,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
   }
 
   test("UDFToStringIntMap") {
-    val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF()
+    val testData = spark.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF()
     testData.createOrReplaceTempView("inputTable")
 
     sql(s"CREATE TEMPORARY FUNCTION testUDFToStringIntMap " +
@@ -212,7 +212,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
   }
 
   test("UDFToIntIntMap") {
-    val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF()
+    val testData = spark.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF()
     testData.createOrReplaceTempView("inputTable")
 
     sql(s"CREATE TEMPORARY FUNCTION testUDFToIntIntMap " +
@@ -228,7 +228,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
   }
 
   test("UDFListListInt") {
-    val testData = hiveContext.sparkContext.parallelize(
+    val testData = spark.sparkContext.parallelize(
       ListListIntCaseClass(Nil) ::
       ListListIntCaseClass(Seq((1, 2, 3))) ::
       ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: Nil).toDF()
@@ -244,7 +244,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
   }
 
   test("UDFListString") {
-    val testData = hiveContext.sparkContext.parallelize(
+    val testData = spark.sparkContext.parallelize(
       ListStringCaseClass(Seq("a", "b", "c")) ::
       ListStringCaseClass(Seq("d", "e")) :: Nil).toDF()
     testData.createOrReplaceTempView("listStringTable")
@@ -259,7 +259,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
   }
 
   test("UDFStringString") {
-    val testData = hiveContext.sparkContext.parallelize(
+    val testData = spark.sparkContext.parallelize(
       StringCaseClass("world") :: StringCaseClass("goodbye") :: Nil).toDF()
     testData.createOrReplaceTempView("stringTable")
 
@@ -278,7 +278,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
   }
 
   test("UDFTwoListList") {
-    val testData = hiveContext.sparkContext.parallelize(
+    val testData = spark.sparkContext.parallelize(
       ListListIntCaseClass(Nil) ::
       ListListIntCaseClass(Seq((1, 2, 3))) ::
       ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) ::
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 8a31a49d97f085e7f4e45aa819dd90128ef462ab..4b51f021bfa0cadebb5a343479c26dfbc5e5ae85 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -58,7 +58,7 @@ case class Order(
  */
 class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
   import hiveContext._
-  import hiveContext.implicits._
+  import spark.implicits._
 
   test("UDTF") {
     withUserDefinedFunction("udtf_count2" -> true) {
@@ -690,7 +690,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     }
   }
 
-  test("SPARK-4699 HiveContext should be case insensitive by default") {
+  test("SPARK-4699 SparkSession with Hive Support should be case insensitive by default") {
     checkAnswer(
       sql("SELECT KEY FROM Src ORDER BY value"),
       sql("SELECT key FROM src ORDER BY value").collect().toSeq)
@@ -707,7 +707,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
 
     val rowRdd = sparkContext.parallelize(row :: Nil)
 
-    hiveContext.createDataFrame(rowRdd, schema).createOrReplaceTempView("testTable")
+    spark.createDataFrame(rowRdd, schema).createOrReplaceTempView("testTable")
 
     sql(
       """CREATE TABLE nullValuesInInnerComplexTypes
@@ -1417,7 +1417,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
            """.stripMargin)
 
         checkAnswer(
-          spark.sqlContext.tables().select('isTemporary).filter('tableName === "t2"),
+          spark.sql("SHOW TABLES").select('isTemporary).filter('tableName === "t2"),
           Row(true)
         )
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
index 42dbe188fb193c19d5c5a8501d57f196f796c5b2..72db3618e08709f0d3d4278dfbbe6ede8f098636 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.test.SQLTestUtils
  * A suite for testing view related functionality.
  */
 class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
-  import hiveContext.implicits._
+  import spark.implicits._
 
   override def beforeAll(): Unit = {
     // Create a simple table with two columns: id and id1
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala
index 47ceefb88ebcd357339a7d83e5f845e294e82715..77e97dff8c221cc1fe07f3f7286df7fd11bb303d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala
@@ -29,7 +29,7 @@ case class WindowData(month: Int, area: String, product: Int)
  * Test suite for SQL window functions.
  */
 class SQLWindowFunctionSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
-  import hiveContext.implicits._
+  import spark.implicits._
 
   test("window function: udaf with aggregate expression") {
     val data = Seq(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
index 19e8025d6b7c940f6a830aba62112c1d5f9d536e..6f8062240732ac75d130249a85594091cd9663c1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.types.StringType
 
 class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
-  import hiveContext.implicits._
+  import spark.implicits._
 
   private val noSerdeIOSchema = HiveScriptIOSchema(
     inputRowFormat = Seq.empty,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index 5dfa58f673bf2a328d7ba85131cdfdd2855548b2..463c368fc42b183c245f5964a9c1f41a626152b7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -59,7 +59,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
         StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
 
       checkQueries(
-        hiveContext.read.options(Map(
+        spark.read.options(Map(
           "path" -> file.getCanonicalPath,
           "dataSchema" -> dataSchemaWithPartition.json)).format(dataSourceName).load())
     }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala
index fed0d11e9d216ffb953b7042cdfdcfb021e3c924..d1ce3f1e2f0584696cda5ea9f5c64c94b1fdd53e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala
@@ -37,8 +37,8 @@ case class OrcParDataWithKey(intField: Int, pi: Int, stringField: String, ps: St
 
 // TODO This test suite duplicates ParquetPartitionDiscoverySuite a lot
 class OrcPartitionDiscoverySuite extends QueryTest with TestHiveSingleton with BeforeAndAfterAll {
-  import hiveContext._
-  import hiveContext.implicits._
+  import spark._
+  import spark.implicits._
 
   val defaultPartitionName = ConfVars.DEFAULTPARTITIONNAME.defaultStrVal
 
@@ -59,7 +59,7 @@ class OrcPartitionDiscoverySuite extends QueryTest with TestHiveSingleton with B
   }
 
   protected def withTempTable(tableName: String)(f: => Unit): Unit = {
-    try f finally hiveContext.dropTempTable(tableName)
+    try f finally spark.catalog.dropTempView(tableName)
   }
 
   protected def makePartitionDir(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index 4cac334859ba13f633d856f29b88fa4559b1b627..871b9e02eb3825314ecadc56db3816f677ce1aa9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.util.Utils
 case class OrcData(intField: Int, stringField: String)
 
 abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndAfterAll {
-  import hiveContext._
+  import spark._
 
   var orcTableDir: File = null
   var orcTableAsDir: File = null
@@ -152,7 +152,7 @@ class OrcSourceSuite extends OrcSuite {
   override def beforeAll(): Unit = {
     super.beforeAll()
 
-    hiveContext.sql(
+    spark.sql(
       s"""CREATE TEMPORARY TABLE normal_orc_source
          |USING org.apache.spark.sql.hive.orc
          |OPTIONS (
@@ -160,7 +160,7 @@ class OrcSourceSuite extends OrcSuite {
          |)
        """.stripMargin)
 
-    hiveContext.sql(
+    spark.sql(
       s"""CREATE TEMPORARY TABLE normal_orc_as_source
          |USING org.apache.spark.sql.hive.orc
          |OPTIONS (
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala
index bb351e20c5e99d4d991e4f458549213f76538854..2a647115b7e015e921aec4f16096e8e10b84824e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala
@@ -61,7 +61,7 @@ private[sql] trait OrcTest extends SQLTestUtils with TestHiveSingleton {
       (data: Seq[T], tableName: String)
       (f: => Unit): Unit = {
     withOrcDataFrame(data) { df =>
-      spark.sqlContext.registerDataFrameAsTable(df, tableName)
+      df.createOrReplaceTempView(tableName)
       withTempTable(tableName)(f)
     }
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 3e5140fe578ded1ec976933ef749d9aa71c6a269..06b74da196854f38206e928093657374daa7318b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -57,7 +57,7 @@ case class ParquetDataWithKeyAndComplexTypes(
  */
 class ParquetMetastoreSuite extends ParquetPartitioningTest {
   import hiveContext._
-  import hiveContext.implicits._
+  import spark.implicits._
 
   override def beforeAll(): Unit = {
     super.beforeAll()
@@ -571,7 +571,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
  */
 class ParquetSourceSuite extends ParquetPartitioningTest {
   import testImplicits._
-  import hiveContext._
+  import spark._
 
   override def beforeAll(): Unit = {
     super.beforeAll()
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index d271e55467c6fb3bd189fb8831c5ce933b84b314..f9891ac5717e0a566c047d8fc5c34335687cda09 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -51,7 +51,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
         .saveAsTable("bucketed_table")
 
       for (i <- 0 until 5) {
-        val table = hiveContext.table("bucketed_table").filter($"i" === i)
+        val table = spark.table("bucketed_table").filter($"i" === i)
         val query = table.queryExecution
         val output = query.analyzed.output
         val rdd = query.toRdd
@@ -80,7 +80,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
       originalDataFrame: DataFrame): Unit = {
     // This test verifies parts of the plan. Disable whole stage codegen.
     withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
-      val bucketedDataFrame = hiveContext.table("bucketed_table").select("i", "j", "k")
+      val bucketedDataFrame = spark.table("bucketed_table").select("i", "j", "k")
       val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec
       // Limit: bucket pruning only works when the bucket column has one and only one column
       assert(bucketColumnNames.length == 1)
@@ -252,8 +252,8 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
 
       withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0",
         SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
-        val t1 = hiveContext.table("bucketed_table1")
-        val t2 = hiveContext.table("bucketed_table2")
+        val t1 = spark.table("bucketed_table1")
+        val t2 = spark.table("bucketed_table2")
         val joined = t1.join(t2, joinCondition(t1, t2, joinColumns))
 
         // First check the result is corrected.
@@ -321,7 +321,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
   test("avoid shuffle when grouping keys are equal to bucket keys") {
     withTable("bucketed_table") {
       df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("bucketed_table")
-      val tbl = hiveContext.table("bucketed_table")
+      val tbl = spark.table("bucketed_table")
       val agged = tbl.groupBy("i", "j").agg(max("k"))
 
       checkAnswer(
@@ -335,7 +335,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
   test("avoid shuffle when grouping keys are a super-set of bucket keys") {
     withTable("bucketed_table") {
       df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")
-      val tbl = hiveContext.table("bucketed_table")
+      val tbl = spark.table("bucketed_table")
       val agged = tbl.groupBy("i", "j").agg(max("k"))
 
       checkAnswer(
@@ -349,11 +349,12 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
   test("error if there exists any malformed bucket files") {
     withTable("bucketed_table") {
       df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")
-      val tableDir = new File(hiveContext.sparkSession.warehousePath, "bucketed_table")
+      val tableDir = new File(hiveContext
+        .sparkSession.warehousePath, "bucketed_table")
       Utils.deleteRecursively(tableDir)
       df1.write.parquet(tableDir.getAbsolutePath)
 
-      val agged = hiveContext.table("bucketed_table").groupBy("i").count()
+      val agged = spark.table("bucketed_table").groupBy("i").count()
       val error = intercept[RuntimeException] {
         agged.count()
       }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index 8bf6f224a4b98c1227755c71c15a825905e3ca3a..ff44c6f29497d2eb60b64319a25208e828c6b9ee 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -69,7 +69,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
   private val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
 
   def tableDir: File = {
-    val identifier = hiveContext.sessionState.sqlParser.parseTableIdentifier("bucketed_table")
+    val identifier = spark.sessionState.sqlParser.parseTableIdentifier("bucketed_table")
     new File(URI.create(hiveContext.sessionState.catalog.hiveDefaultTableFilePath(identifier)))
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
index ef37787137d07208dc4b9457413c8e8d12315bbc..d79edee5b1a4c166da1197821b8fa567b16494ae 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
@@ -53,7 +53,7 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest {
         StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
 
       checkQueries(
-        hiveContext.read.format(dataSourceName)
+        spark.read.format(dataSourceName)
           .option("dataSchema", dataSchemaWithPartition.json)
           .load(file.getCanonicalPath))
     }
@@ -71,14 +71,14 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest {
       val data =
         Row(Seq(1L, 2L, 3L), Map("m1" -> Row(4L))) ::
           Row(Seq(5L, 6L, 7L), Map("m2" -> Row(10L))) :: Nil
-      val df = hiveContext.createDataFrame(sparkContext.parallelize(data), schema)
+      val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
 
       // Write the data out.
       df.write.format(dataSourceName).save(file.getCanonicalPath)
 
       // Read it back and check the result.
       checkAnswer(
-        hiveContext.read.format(dataSourceName).schema(schema).load(file.getCanonicalPath),
+        spark.read.format(dataSourceName).schema(schema).load(file.getCanonicalPath),
         df
       )
     }
@@ -96,14 +96,14 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest {
         Row(new BigDecimal("10.02")) ::
           Row(new BigDecimal("20000.99")) ::
           Row(new BigDecimal("10000")) :: Nil
-      val df = hiveContext.createDataFrame(sparkContext.parallelize(data), schema)
+      val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
 
       // Write the data out.
       df.write.format(dataSourceName).save(file.getCanonicalPath)
 
       // Read it back and check the result.
       checkAnswer(
-        hiveContext.read.format(dataSourceName).schema(schema).load(file.getCanonicalPath),
+        spark.read.format(dataSourceName).schema(schema).load(file.getCanonicalPath),
         df
       )
     }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index 4b4852c1d793a7213362b3fb52e4132369557334..f9a1d16d9094d081e974473d6cb917d9c4da5e0a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -58,7 +58,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
         StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
 
       checkQueries(
-        hiveContext.read.format(dataSourceName)
+        spark.read.format(dataSourceName)
           .option("dataSchema", dataSchemaWithPartition.json)
           .load(file.getCanonicalPath))
     }
@@ -76,7 +76,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
         .format("parquet")
         .save(s"${dir.getCanonicalPath}/_temporary")
 
-      checkAnswer(hiveContext.read.format("parquet").load(dir.getCanonicalPath), df.collect())
+      checkAnswer(spark.read.format("parquet").load(dir.getCanonicalPath), df.collect())
     }
   }
 
@@ -104,7 +104,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
 
       // This shouldn't throw anything.
       df.write.format("parquet").mode(SaveMode.Overwrite).save(path)
-      checkAnswer(hiveContext.read.format("parquet").load(path), df)
+      checkAnswer(spark.read.format("parquet").load(path), df)
     }
   }
 
@@ -114,7 +114,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
         // Parquet doesn't allow field names with spaces.  Here we are intentionally making an
         // exception thrown from the `ParquetRelation2.prepareForWriteJob()` method to trigger
         // the bug.  Please refer to spark-8079 for more details.
-        hiveContext.range(1, 10)
+        spark.range(1, 10)
           .withColumnRenamed("id", "a b")
           .write
           .format("parquet")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
index fa64c7dcfab6ec85d4ac0e48172282abf99e941f..a47a2246ddc3c098eb0d2415022d18ba594895cf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
@@ -60,7 +60,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
         StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
 
       checkQueries(
-        hiveContext.read.format(dataSourceName)
+        spark.read.format(dataSourceName)
           .option("dataSchema", dataSchemaWithPartition.json)
           .load(file.getCanonicalPath))
     }