diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
index f7be5f6b370ab29bdf6923e656a10b77ae7a8f35..33588ef72ffbe3aa5c7731b61baca90c74cecce4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
@@ -155,7 +155,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
    * @since 1.3.1
    */
   def fill(value: Double, cols: Seq[String]): DataFrame = {
-    val columnEquals = df.sqlContext.analyzer.resolver
+    val columnEquals = df.sqlContext.sessionState.analyzer.resolver
     val projections = df.schema.fields.map { f =>
       // Only fill if the column is part of the cols list.
       if (f.dataType.isInstanceOf[NumericType] && cols.exists(col => columnEquals(f.name, col))) {
@@ -182,7 +182,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
    * @since 1.3.1
    */
   def fill(value: String, cols: Seq[String]): DataFrame = {
-    val columnEquals = df.sqlContext.analyzer.resolver
+    val columnEquals = df.sqlContext.sessionState.analyzer.resolver
     val projections = df.schema.fields.map { f =>
       // Only fill if the column is part of the cols list.
       if (f.dataType.isInstanceOf[StringType] && cols.exists(col => columnEquals(f.name, col))) {
@@ -353,7 +353,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
       case _: String => StringType
     }
 
-    val columnEquals = df.sqlContext.analyzer.resolver
+    val columnEquals = df.sqlContext.sessionState.analyzer.resolver
     val projections = df.schema.fields.map { f =>
       val shouldReplace = cols.exists(colName => columnEquals(colName, f.name))
       if (f.dataType.isInstanceOf[NumericType] && targetColumnType == DoubleType && shouldReplace) {
@@ -382,7 +382,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
       }
     }
 
-    val columnEquals = df.sqlContext.analyzer.resolver
+    val columnEquals = df.sqlContext.sessionState.analyzer.resolver
     val projections = df.schema.fields.map { f =>
       values.find { case (k, _) => columnEquals(k, f.name) }.map { case (_, v) =>
         v match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 76b8d71ac9359a2f8ce5fc97a987c5137bd8cf25..57c978bec8485acd366405005da0d7e32b627fe6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -394,7 +394,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
    */
   def table(tableName: String): DataFrame = {
     Dataset.newDataFrame(sqlContext,
-      sqlContext.catalog.lookupRelation(
+      sqlContext.sessionState.catalog.lookupRelation(
         sqlContext.sessionState.sqlParser.parseTableIdentifier(tableName)))
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index de87f4d7c24ef310e41d1e417e702944fb51924a..9951f0fabff15b68c4bbbf838fb966fdd0407f1e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -323,7 +323,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
    */
   private def normalize(columnName: String, columnType: String): String = {
     val validColumnNames = df.logicalPlan.output.map(_.name)
-    validColumnNames.find(df.sqlContext.analyzer.resolver(_, columnName))
+    validColumnNames.find(df.sqlContext.sessionState.analyzer.resolver(_, columnName))
       .getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " +
         s"existing columns (${validColumnNames.mkString(", ")})"))
   }
@@ -358,7 +358,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
   }
 
   private def saveAsTable(tableIdent: TableIdentifier): Unit = {
-    val tableExists = df.sqlContext.catalog.tableExists(tableIdent)
+    val tableExists = df.sqlContext.sessionState.catalog.tableExists(tableIdent)
 
     (tableExists, mode) match {
       case (true, SaveMode.Ignore) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index ef239a1e2f3248d45c8d4424d64cc12e554bde85..f7ef0de21c6c5fffb6bf1267392d4d4557fa21db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -166,15 +166,16 @@ class Dataset[T] private[sql](
   private implicit def classTag = unresolvedTEncoder.clsTag
 
   protected[sql] def resolve(colName: String): NamedExpression = {
-    queryExecution.analyzed.resolveQuoted(colName, sqlContext.analyzer.resolver).getOrElse {
-      throw new AnalysisException(
-        s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
-    }
+    queryExecution.analyzed.resolveQuoted(colName, sqlContext.sessionState.analyzer.resolver)
+      .getOrElse {
+        throw new AnalysisException(
+          s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
+      }
   }
 
   protected[sql] def numericColumns: Seq[Expression] = {
     schema.fields.filter(_.dataType.isInstanceOf[NumericType]).map { n =>
-      queryExecution.analyzed.resolveQuoted(n.name, sqlContext.analyzer.resolver).get
+      queryExecution.analyzed.resolveQuoted(n.name, sqlContext.sessionState.analyzer.resolver).get
     }
   }
 
@@ -1400,7 +1401,7 @@ class Dataset[T] private[sql](
    * @since 1.3.0
    */
   def withColumn(colName: String, col: Column): DataFrame = {
-    val resolver = sqlContext.analyzer.resolver
+    val resolver = sqlContext.sessionState.analyzer.resolver
     val output = queryExecution.analyzed.output
     val shouldReplace = output.exists(f => resolver(f.name, colName))
     if (shouldReplace) {
@@ -1421,7 +1422,7 @@ class Dataset[T] private[sql](
    * Returns a new [[DataFrame]] by adding a column with metadata.
    */
   private[spark] def withColumn(colName: String, col: Column, metadata: Metadata): DataFrame = {
-    val resolver = sqlContext.analyzer.resolver
+    val resolver = sqlContext.sessionState.analyzer.resolver
     val output = queryExecution.analyzed.output
     val shouldReplace = output.exists(f => resolver(f.name, colName))
     if (shouldReplace) {
@@ -1445,7 +1446,7 @@ class Dataset[T] private[sql](
    * @since 1.3.0
    */
   def withColumnRenamed(existingName: String, newName: String): DataFrame = {
-    val resolver = sqlContext.analyzer.resolver
+    val resolver = sqlContext.sessionState.analyzer.resolver
     val output = queryExecution.analyzed.output
     val shouldRename = output.exists(f => resolver(f.name, existingName))
     if (shouldRename) {
@@ -1480,7 +1481,7 @@ class Dataset[T] private[sql](
    */
   @scala.annotation.varargs
   def drop(colNames: String*): DataFrame = {
-    val resolver = sqlContext.analyzer.resolver
+    val resolver = sqlContext.sessionState.analyzer.resolver
     val remainingCols =
       schema.filter(f => colNames.forall(n => !resolver(f.name, n))).map(f => Column(f.name))
     if (remainingCols.size == this.schema.size) {
@@ -1501,7 +1502,8 @@ class Dataset[T] private[sql](
   def drop(col: Column): DataFrame = {
     val expression = col match {
       case Column(u: UnresolvedAttribute) =>
-        queryExecution.analyzed.resolveQuoted(u.name, sqlContext.analyzer.resolver).getOrElse(u)
+        queryExecution.analyzed.resolveQuoted(
+          u.name, sqlContext.sessionState.analyzer.resolver).getOrElse(u)
       case Column(expr: Expression) => expr
     }
     val attrs = this.logicalPlan.output
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 0f5d1c8cab5193ef168d8f5e24505902e2c24a7f..177d78c4c08f9f26a67e43f04b8bbfdff24dec93 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -120,8 +120,6 @@ class SQLContext private[sql](
   @transient
   protected[sql] lazy val sessionState: SessionState = new SessionState(self)
   protected[sql] def conf: SQLConf = sessionState.conf
-  protected[sql] def catalog: Catalog = sessionState.catalog
-  protected[sql] def analyzer: Analyzer = sessionState.analyzer
 
   /**
    * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s
@@ -699,7 +697,8 @@ class SQLContext private[sql](
    * only during the lifetime of this instance of SQLContext.
    */
   private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = {
-    catalog.registerTable(sessionState.sqlParser.parseTableIdentifier(tableName), df.logicalPlan)
+    sessionState.catalog.registerTable(
+      sessionState.sqlParser.parseTableIdentifier(tableName), df.logicalPlan)
   }
 
   /**
@@ -712,7 +711,7 @@ class SQLContext private[sql](
    */
   def dropTempTable(tableName: String): Unit = {
     cacheManager.tryUncacheQuery(table(tableName))
-    catalog.unregisterTable(TableIdentifier(tableName))
+    sessionState.catalog.unregisterTable(TableIdentifier(tableName))
   }
 
   /**
@@ -797,7 +796,7 @@ class SQLContext private[sql](
   }
 
   private def table(tableIdent: TableIdentifier): DataFrame = {
-    Dataset.newDataFrame(this, catalog.lookupRelation(tableIdent))
+    Dataset.newDataFrame(this, sessionState.catalog.lookupRelation(tableIdent))
   }
 
   /**
@@ -839,7 +838,7 @@ class SQLContext private[sql](
    * @since 1.3.0
    */
   def tableNames(): Array[String] = {
-    catalog.getTables(None).map {
+    sessionState.catalog.getTables(None).map {
       case (tableName, _) => tableName
     }.toArray
   }
@@ -851,7 +850,7 @@ class SQLContext private[sql](
    * @since 1.3.0
    */
   def tableNames(databaseName: String): Array[String] = {
-    catalog.getTables(Some(databaseName)).map {
+    sessionState.catalog.getTables(Some(databaseName)).map {
       case (tableName, _) => tableName
     }.toArray
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 5b4254f741ab1a4569715d3ece634e42efe276c7..912b84abc187c46095ec8741906e36d674d81753 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -31,14 +31,14 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
  */
 class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
 
-  def assertAnalyzed(): Unit = try sqlContext.analyzer.checkAnalysis(analyzed) catch {
+  def assertAnalyzed(): Unit = try sqlContext.sessionState.analyzer.checkAnalysis(analyzed) catch {
     case e: AnalysisException =>
       val ae = new AnalysisException(e.message, e.line, e.startPosition, Some(analyzed))
       ae.setStackTrace(e.getStackTrace)
       throw ae
   }
 
-  lazy val analyzed: LogicalPlan = sqlContext.analyzer.execute(logical)
+  lazy val analyzed: LogicalPlan = sqlContext.sessionState.analyzer.execute(logical)
 
   lazy val withCachedData: LogicalPlan = {
     assertAnalyzed()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index e711797c1b51ae7e001ff6d7bf7ae67c56a93151..44b07e4613263ad8870ced61edd8904f26762ac4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -330,7 +330,7 @@ case class ShowTablesCommand(databaseName: Option[String]) extends RunnableComma
   override def run(sqlContext: SQLContext): Seq[Row] = {
     // Since we need to return a Seq of rows, we will call getTables directly
     // instead of calling tables in sqlContext.
-    val rows = sqlContext.catalog.getTables(databaseName).map {
+    val rows = sqlContext.sessionState.catalog.getTables(databaseName).map {
       case (tableName, isTemporary) => Row(tableName, isTemporary)
     }
 
@@ -417,7 +417,7 @@ case class DescribeFunction(
 case class SetDatabaseCommand(databaseName: String) extends RunnableCommand {
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
-    sqlContext.catalog.setCurrentDatabase(databaseName)
+    sqlContext.sessionState.catalog.setCurrentDatabase(databaseName)
     Seq.empty[Row]
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index ef95d5d28961fad5510b08506056bde1839b296b..84e98c0f9e0f5dace7c3bcd3c1624c3673b224f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -67,7 +67,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
       val filterSet = ExpressionSet(filters)
 
       val partitionColumns =
-        AttributeSet(l.resolve(files.partitionSchema, files.sqlContext.analyzer.resolver))
+        AttributeSet(
+          l.resolve(files.partitionSchema, files.sqlContext.sessionState.analyzer.resolver))
       val partitionKeyFilters =
         ExpressionSet(filters.filter(_.references.subsetOf(partitionColumns)))
       logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 04e51735c46f794a5791582bec5572f822b5f170..7ca0e8859a03eb234ddbfb48fac6abd03e0a25fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -99,7 +99,7 @@ case class CreateTempTableUsing(
       userSpecifiedSchema = userSpecifiedSchema,
       className = provider,
       options = options)
-    sqlContext.catalog.registerTable(
+    sqlContext.sessionState.catalog.registerTable(
       tableIdent,
       Dataset.newDataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan)
 
@@ -124,7 +124,7 @@ case class CreateTempTableUsingAsSelect(
       bucketSpec = None,
       options = options)
     val result = dataSource.write(mode, df)
-    sqlContext.catalog.registerTable(
+    sqlContext.sessionState.catalog.registerTable(
       tableIdent,
       Dataset.newDataFrame(sqlContext, LogicalRelation(result)).logicalPlan)
 
@@ -137,11 +137,11 @@ case class RefreshTable(tableIdent: TableIdentifier)
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
     // Refresh the given table's metadata first.
-    sqlContext.catalog.refreshTable(tableIdent)
+    sqlContext.sessionState.catalog.refreshTable(tableIdent)
 
     // If this table is cached as a InMemoryColumnarRelation, drop the original
     // cached version and make the new version cached lazily.
-    val logicalPlan = sqlContext.catalog.lookupRelation(tableIdent)
+    val logicalPlan = sqlContext.sessionState.catalog.lookupRelation(tableIdent)
     // Use lookupCachedData directly since RefreshTable also takes databaseName.
     val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
     if (isCached) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
index 3d7c576965fc0771f15e9d1e928489eb03ffc3a1..2820e4fa23e13efd746cd0d2d84f5d41da022f59 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
@@ -33,7 +33,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
   }
 
   after {
-    sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
+    sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
   }
 
   test("get all tables") {
@@ -45,7 +45,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
       sql("SHOW tables").filter("tableName = 'ListTablesSuiteTable'"),
       Row("ListTablesSuiteTable", true))
 
-    sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
+    sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
     assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
   }
 
@@ -58,7 +58,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
       sql("show TABLES in DB").filter("tableName = 'ListTablesSuiteTable'"),
       Row("ListTablesSuiteTable", true))
 
-    sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
+    sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
     assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index acfc1a518a0a56df2ed753a1df91b22e77d7f1cf..fb99b0c7e2acd5306e69cd1ccce4c501a3f64e39 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -51,7 +51,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
       sql("INSERT INTO TABLE t SELECT * FROM tmp")
       checkAnswer(sqlContext.table("t"), (data ++ data).map(Row.fromTuple))
     }
-    sqlContext.catalog.unregisterTable(TableIdentifier("tmp"))
+    sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("tmp"))
   }
 
   test("overwriting") {
@@ -61,7 +61,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
       sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
       checkAnswer(sqlContext.table("t"), data.map(Row.fromTuple))
     }
-    sqlContext.catalog.unregisterTable(TableIdentifier("tmp"))
+    sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("tmp"))
   }
 
   test("self-join") {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index a78b7b0cc4961a2cc2d72fae5da19f8000e1d636..05fc569588658c733fef285540967ac419ac3455 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -113,8 +113,6 @@ class HiveContext private[hive](
   @transient
   protected[sql] override lazy val sessionState = new HiveSessionState(self)
 
-  protected[sql] override def catalog = sessionState.catalog
-
   // The Hive UDF current_database() is foldable, will be evaluated by optimizer,
   // but the optimizer can't access the SessionState of metadataHive.
   sessionState.functionRegistry.registerFunction(
@@ -349,12 +347,12 @@ class HiveContext private[hive](
    */
   def refreshTable(tableName: String): Unit = {
     val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
-    catalog.refreshTable(tableIdent)
+    sessionState.catalog.refreshTable(tableIdent)
   }
 
   protected[hive] def invalidateTable(tableName: String): Unit = {
     val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
-    catalog.invalidateTable(tableIdent)
+    sessionState.catalog.invalidateTable(tableIdent)
   }
 
   /**
@@ -368,7 +366,7 @@ class HiveContext private[hive](
    */
   def analyze(tableName: String) {
     val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
-    val relation = EliminateSubqueryAliases(catalog.lookupRelation(tableIdent))
+    val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
 
     relation match {
       case relation: MetastoreRelation =>
@@ -429,7 +427,7 @@ class HiveContext private[hive](
         // recorded in the Hive metastore.
         // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
         if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
-          catalog.client.alterTable(
+          sessionState.catalog.client.alterTable(
             relation.table.copy(
               properties = relation.table.properties +
                 (StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString)))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 44f579fbb78d127635667e6fe0f82566581e71fc..91425d143554a0ac0412fc3906d2285805c78b86 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -69,17 +69,17 @@ case class CreateTableAsSelect(
         withFormat
       }
 
-      hiveContext.catalog.client.createTable(withSchema, ignoreIfExists = false)
+      hiveContext.sessionState.catalog.client.createTable(withSchema, ignoreIfExists = false)
 
       // Get the Metastore Relation
-      hiveContext.catalog.lookupRelation(tableIdentifier, None) match {
+      hiveContext.sessionState.catalog.lookupRelation(tableIdentifier, None) match {
         case r: MetastoreRelation => r
       }
     }
     // TODO ideally, we should get the output data ready first and then
     // add the relation into catalog, just in case of failure occurs while data
     // processing.
-    if (hiveContext.catalog.tableExists(tableIdentifier)) {
+    if (hiveContext.sessionState.catalog.tableExists(tableIdentifier)) {
       if (allowExisting) {
         // table already exists, will do nothing, to keep consistent with Hive
       } else {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
index 83d057f7e4d88a8317f0371c6483ada21113ea64..6c2b88eb8c6d947129a251d21f00e2f32b8d75d1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
@@ -49,14 +49,14 @@ private[hive] case class CreateViewAsSelect(
   override def run(sqlContext: SQLContext): Seq[Row] = {
     val hiveContext = sqlContext.asInstanceOf[HiveContext]
 
-    hiveContext.catalog.tableExists(tableIdentifier) match {
+    hiveContext.sessionState.catalog.tableExists(tableIdentifier) match {
       case true if allowExisting =>
         // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
         // already exists.
 
       case true if orReplace =>
         // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
-        hiveContext.catalog.client.alertView(prepareTable(sqlContext))
+        hiveContext.sessionState.catalog.client.alertView(prepareTable(sqlContext))
 
       case true =>
         // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
@@ -66,7 +66,7 @@ private[hive] case class CreateViewAsSelect(
           "CREATE OR REPLACE VIEW AS")
 
       case false =>
-        hiveContext.catalog.client.createView(prepareTable(sqlContext))
+        hiveContext.sessionState.catalog.client.createView(prepareTable(sqlContext))
     }
 
     Seq.empty[Row]
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index b3d38dfdb49eaa0b1216ed00b244266763657d41..4ffd868242b86359258620cf8efcb814cbf36759 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -45,7 +45,7 @@ case class InsertIntoHiveTable(
 
   @transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
   @transient private lazy val hiveContext = new Context(sc.hiveconf)
-  @transient private lazy val catalog = sc.catalog
+  @transient private lazy val catalog = sc.sessionState.catalog
 
   def output: Seq[Attribute] = Seq.empty
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index c4723fcb82341dad3172c45659ba26d6b1f11301..ff6657362013d0a8e6f3d0cc0ef5f6a9c0463c5c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -71,7 +71,7 @@ case class DropTable(
     }
     hiveContext.invalidateTable(tableName)
     hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
-    hiveContext.catalog.unregisterTable(TableIdentifier(tableName))
+    hiveContext.sessionState.catalog.unregisterTable(TableIdentifier(tableName))
     Seq.empty[Row]
   }
 }
@@ -130,7 +130,7 @@ case class CreateMetastoreDataSource(
     val tableName = tableIdent.unquotedString
     val hiveContext = sqlContext.asInstanceOf[HiveContext]
 
-    if (hiveContext.catalog.tableExists(tableIdent)) {
+    if (hiveContext.sessionState.catalog.tableExists(tableIdent)) {
       if (allowExisting) {
         return Seq.empty[Row]
       } else {
@@ -142,7 +142,7 @@ case class CreateMetastoreDataSource(
     val optionsWithPath =
       if (!options.contains("path") && managedIfNoPath) {
         isExternal = false
-        options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableIdent))
+        options + ("path" -> hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
       } else {
         options
       }
@@ -155,7 +155,7 @@ case class CreateMetastoreDataSource(
       bucketSpec = None,
       options = optionsWithPath).resolveRelation()
 
-    hiveContext.catalog.createDataSourceTable(
+    hiveContext.sessionState.catalog.createDataSourceTable(
       tableIdent,
       userSpecifiedSchema,
       Array.empty[String],
@@ -200,13 +200,13 @@ case class CreateMetastoreDataSourceAsSelect(
     val optionsWithPath =
       if (!options.contains("path")) {
         isExternal = false
-        options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableIdent))
+        options + ("path" -> hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
       } else {
         options
       }
 
     var existingSchema = None: Option[StructType]
-    if (sqlContext.catalog.tableExists(tableIdent)) {
+    if (sqlContext.sessionState.catalog.tableExists(tableIdent)) {
       // Check if we need to throw an exception or just return.
       mode match {
         case SaveMode.ErrorIfExists =>
@@ -230,7 +230,8 @@ case class CreateMetastoreDataSourceAsSelect(
           // TODO: Check that options from the resolved relation match the relation that we are
           // inserting into (i.e. using the same compression).
 
-          EliminateSubqueryAliases(sqlContext.catalog.lookupRelation(tableIdent)) match {
+          EliminateSubqueryAliases(
+            sqlContext.sessionState.catalog.lookupRelation(tableIdent)) match {
             case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
               existingSchema = Some(l.schema)
             case o =>
@@ -267,7 +268,7 @@ case class CreateMetastoreDataSourceAsSelect(
       // We will use the schema of resolved.relation as the schema of the table (instead of
       // the schema of df). It is important since the nullability may be changed by the relation
       // provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
-      hiveContext.catalog.createDataSourceTable(
+      hiveContext.sessionState.catalog.createDataSourceTable(
         tableIdent,
         Some(result.schema),
         partitionColumns,
@@ -278,7 +279,7 @@ case class CreateMetastoreDataSourceAsSelect(
     }
 
     // Refresh the cache of the table in the catalog.
-    hiveContext.catalog.refreshTable(tableIdent)
+    hiveContext.sessionState.catalog.refreshTable(tableIdent)
     Seq.empty[Row]
   }
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 5887f69e13836961e94b564bd9d90b8a97d533a6..19c05f9cb0d9c7bd17f7bdcd658e4f9855a77ae8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -205,7 +205,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
       logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}")
       referencedTestTables.foreach(loadTestTable)
       // Proceed with analysis.
-      analyzer.execute(logical)
+      sessionState.analyzer.execute(logical)
     }
   }
 
@@ -427,9 +427,9 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
 
       cacheManager.clearCache()
       loadedTables.clear()
-      catalog.cachedDataSourceTables.invalidateAll()
-      catalog.client.reset()
-      catalog.unregisterAllTables()
+      sessionState.catalog.cachedDataSourceTables.invalidateAll()
+      sessionState.catalog.client.reset()
+      sessionState.catalog.unregisterAllTables()
 
       FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)).
         foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) }
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
index e9356541c22dfae8ba4375067c02f67af3ee6e0c..bd14a243eaeb44982f62169760fa491e41d2e119 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
@@ -70,7 +70,7 @@ public class JavaMetastoreDataSourcesSuite {
     if (path.exists()) {
       path.delete();
     }
-    hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath(
+    hiveManagedPath = new Path(sqlContext.sessionState().catalog().hiveDefaultTableFilePath(
       new TableIdentifier("javaSavedTable")));
     fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
     if (fs.exists(hiveManagedPath)){
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 90d65d9e9b8c082ab0cba9fd4b02325d97cd12c0..ce7b08ab72f7903d67389d9231ce7ca90cbeafff 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -83,7 +83,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
             .saveAsTable("t")
         }
 
-        val hiveTable = catalog.client.getTable("default", "t")
+        val hiveTable = sessionState.catalog.client.getTable("default", "t")
         assert(hiveTable.storage.inputFormat === Some(inputFormat))
         assert(hiveTable.storage.outputFormat === Some(outputFormat))
         assert(hiveTable.storage.serde === Some(serde))
@@ -114,7 +114,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
               .saveAsTable("t")
           }
 
-          val hiveTable = catalog.client.getTable("default", "t")
+          val hiveTable = sessionState.catalog.client.getTable("default", "t")
           assert(hiveTable.storage.inputFormat === Some(inputFormat))
           assert(hiveTable.storage.outputFormat === Some(outputFormat))
           assert(hiveTable.storage.serde === Some(serde))
@@ -144,7 +144,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
                |AS SELECT 1 AS d1, "val_1" AS d2
              """.stripMargin)
 
-          val hiveTable = catalog.client.getTable("default", "t")
+          val hiveTable = sessionState.catalog.client.getTable("default", "t")
           assert(hiveTable.storage.inputFormat === Some(inputFormat))
           assert(hiveTable.storage.outputFormat === Some(outputFormat))
           assert(hiveTable.storage.serde === Some(serde))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
index a94f7053c39ffe63fb548278471a9dc5c924c4d4..0a31ac64a20f559f9a86e74367066cc431e3944c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
@@ -32,14 +32,14 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft
 
   override def beforeAll(): Unit = {
     // The catalog in HiveContext is a case insensitive one.
-    catalog.registerTable(TableIdentifier("ListTablesSuiteTable"), df.logicalPlan)
+    sessionState.catalog.registerTable(TableIdentifier("ListTablesSuiteTable"), df.logicalPlan)
     sql("CREATE TABLE HiveListTablesSuiteTable (key int, value string)")
     sql("CREATE DATABASE IF NOT EXISTS ListTablesSuiteDB")
     sql("CREATE TABLE ListTablesSuiteDB.HiveInDBListTablesSuiteTable (key int, value string)")
   }
 
   override def afterAll(): Unit = {
-    catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
+    sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
     sql("DROP TABLE IF EXISTS HiveListTablesSuiteTable")
     sql("DROP TABLE IF EXISTS ListTablesSuiteDB.HiveInDBListTablesSuiteTable")
     sql("DROP DATABASE IF EXISTS ListTablesSuiteDB")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index aaebad79f6b66006fba00abe3f266efee5764029..d7974f1ee3ec03e64d348c757a1eb35528b932bf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -369,7 +369,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
            |)
          """.stripMargin)
 
-      val expectedPath = catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable"))
+      val expectedPath =
+        sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable"))
       val filesystemPath = new Path(expectedPath)
       val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration)
       if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true)
@@ -460,7 +461,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
           // Drop table will also delete the data.
           sql("DROP TABLE savedJsonTable")
           intercept[AnalysisException] {
-            read.json(catalog.hiveDefaultTableFilePath(TableIdentifier("savedJsonTable")))
+            read.json(
+              sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("savedJsonTable")))
           }
         }
 
@@ -695,7 +697,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true)))
 
         // Manually create a metastore data source table.
-        catalog.createDataSourceTable(
+        sessionState.catalog.createDataSourceTable(
           tableIdent = TableIdentifier("wide_schema"),
           userSpecifiedSchema = Some(schema),
           partitionColumns = Array.empty[String],
@@ -727,14 +729,14 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
           outputFormat = None,
           serde = None,
           serdeProperties = Map(
-            "path" -> catalog.hiveDefaultTableFilePath(TableIdentifier(tableName)))
+            "path" -> sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier(tableName)))
         ),
         properties = Map(
           "spark.sql.sources.provider" -> "json",
           "spark.sql.sources.schema" -> schema.json,
           "EXTERNAL" -> "FALSE"))
 
-      catalog.client.createTable(hiveTable, ignoreIfExists = false)
+      sessionState.catalog.client.createTable(hiveTable, ignoreIfExists = false)
 
       invalidateTable(tableName)
       val actualSchema = table(tableName).schema
@@ -749,7 +751,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
     withTable(tableName) {
       df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName)
       invalidateTable(tableName)
-      val metastoreTable = catalog.client.getTable("default", tableName)
+      val metastoreTable = sessionState.catalog.client.getTable("default", tableName)
       val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
 
       val numPartCols = metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt
@@ -784,7 +786,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         .sortBy("c")
         .saveAsTable(tableName)
       invalidateTable(tableName)
-      val metastoreTable = catalog.client.getTable("default", tableName)
+      val metastoreTable = sessionState.catalog.client.getTable("default", tableName)
       val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
       val expectedSortByColumns = StructType(df.schema("c") :: Nil)
 
@@ -901,7 +903,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
   test("skip hive metadata on table creation") {
     val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType)))
 
-    catalog.createDataSourceTable(
+    sessionState.catalog.createDataSourceTable(
       tableIdent = TableIdentifier("not_skip_hive_metadata"),
       userSpecifiedSchema = Some(schema),
       partitionColumns = Array.empty[String],
@@ -912,10 +914,10 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
 
     // As a proxy for verifying that the table was stored in Hive compatible format, we verify that
     // each column of the table is of native type StringType.
-    assert(catalog.client.getTable("default", "not_skip_hive_metadata").schema
+    assert(sessionState.catalog.client.getTable("default", "not_skip_hive_metadata").schema
       .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == StringType))
 
-    catalog.createDataSourceTable(
+    sessionState.catalog.createDataSourceTable(
       tableIdent = TableIdentifier("skip_hive_metadata"),
       userSpecifiedSchema = Some(schema),
       partitionColumns = Array.empty[String],
@@ -926,7 +928,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
 
     // As a proxy for verifying that the table was stored in SparkSQL format, we verify that
     // the table has a column type as array of StringType.
-    assert(catalog.client.getTable("default", "skip_hive_metadata").schema
+    assert(sessionState.catalog.client.getTable("default", "skip_hive_metadata").schema
       .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == ArrayType(StringType)))
   }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
index 488f298981c866800b48794fad6433e1093e384b..e2effef0b9bdf94dba4752297fba512869e4b9fc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
@@ -25,8 +25,9 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
   private lazy val df = sqlContext.range(10).coalesce(1)
 
   private def checkTablePath(dbName: String, tableName: String): Unit = {
-    val metastoreTable = hiveContext.catalog.client.getTable(dbName, tableName)
-    val expectedPath = hiveContext.catalog.client.getDatabase(dbName).locationUri + "/" + tableName
+    val metastoreTable = hiveContext.sessionState.catalog.client.getTable(dbName, tableName)
+    val expectedPath =
+      hiveContext.sessionState.catalog.client.getDatabase(dbName).locationUri + "/" + tableName
 
     assert(metastoreTable.storage.serdeProperties("path") === expectedPath)
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 3952e716d38a654055369e9bf2290bfe34b254a7..1d8c293d43c05f97b423fed9590e460a855aa5b3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -73,7 +73,8 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
 
   test("analyze MetastoreRelations") {
     def queryTotalSize(tableName: String): BigInt =
-      hiveContext.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes
+      hiveContext.sessionState.catalog.lookupRelation(
+        TableIdentifier(tableName)).statistics.sizeInBytes
 
     // Non-partitioned table
     sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect()
@@ -120,7 +121,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
     intercept[UnsupportedOperationException] {
       hiveContext.analyze("tempTable")
     }
-    hiveContext.catalog.unregisterTable(TableIdentifier("tempTable"))
+    hiveContext.sessionState.catalog.unregisterTable(TableIdentifier("tempTable"))
   }
 
   test("estimates the size of a test MetastoreRelation") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index b42f00e90f31d1c481b2a5c45b05c7028a483c69..21dfb82876fc2cf6751b2c1343b1e179787fee47 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -292,7 +292,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
 
   test("CTAS without serde") {
     def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = {
-      val relation = EliminateSubqueryAliases(catalog.lookupRelation(TableIdentifier(tableName)))
+      val relation = EliminateSubqueryAliases(
+        sessionState.catalog.lookupRelation(TableIdentifier(tableName)))
       relation match {
         case LogicalRelation(r: HadoopFsRelation, _, _) =>
           if (!isDataSourceParquet) {
@@ -720,7 +721,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     (1 to 100).par.map { i =>
       val tableName = s"SPARK_6618_table_$i"
       sql(s"CREATE TABLE $tableName (col1 string)")
-      catalog.lookupRelation(TableIdentifier(tableName))
+      sessionState.catalog.lookupRelation(TableIdentifier(tableName))
       table(tableName)
       tables()
       sql(s"DROP TABLE $tableName")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 8cfb32f00a884268282641894c0c289039d76d51..57c4ad4248b7214da3784142befed558dae9a54d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -222,7 +222,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
       sql("INSERT INTO TABLE t SELECT * FROM tmp")
       checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
     }
-    catalog.unregisterTable(TableIdentifier("tmp"))
+    sessionState.catalog.unregisterTable(TableIdentifier("tmp"))
   }
 
   test("overwriting") {
@@ -232,7 +232,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
       sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
       checkAnswer(table("t"), data.map(Row.fromTuple))
     }
-    catalog.unregisterTable(TableIdentifier("tmp"))
+    sessionState.catalog.unregisterTable(TableIdentifier("tmp"))
   }
 
   test("self-join") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 8fdbbd94c807e863c51063bba364d3e3ce28bb6f..bb53179c3cce32ea34c8fc551226174193903eb6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -425,10 +425,10 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
   }
 
   test("Caching converted data source Parquet Relations") {
-    val _catalog = catalog
+    val _catalog = sessionState.catalog
     def checkCached(tableIdentifier: _catalog.QualifiedTableName): Unit = {
       // Converted test_parquet should be cached.
-      catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match {
+      sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match {
         case null => fail("Converted test_parquet should be cached in the cache.")
         case logical @ LogicalRelation(parquetRelation: HadoopFsRelation, _, _) => // OK
         case other =>
@@ -456,14 +456,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
     var tableIdentifier = _catalog.QualifiedTableName("default", "test_insert_parquet")
 
     // First, make sure the converted test_parquet is not cached.
-    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+    assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
     // Table lookup will make the table cached.
     table("test_insert_parquet")
     checkCached(tableIdentifier)
     // For insert into non-partitioned table, we will do the conversion,
     // so the converted test_insert_parquet should be cached.
     invalidateTable("test_insert_parquet")
-    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+    assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
     sql(
       """
         |INSERT INTO TABLE test_insert_parquet
@@ -476,7 +476,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
       sql("select a, b from jt").collect())
     // Invalidate the cache.
     invalidateTable("test_insert_parquet")
-    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+    assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
 
     // Create a partitioned table.
     sql(
@@ -494,7 +494,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
       """.stripMargin)
 
     tableIdentifier = _catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test")
-    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+    assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
     sql(
       """
         |INSERT INTO TABLE test_parquet_partitioned_cache_test
@@ -503,14 +503,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
       """.stripMargin)
     // Right now, insert into a partitioned Parquet is not supported in data source Parquet.
     // So, we expect it is not cached.
-    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+    assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
     sql(
       """
         |INSERT INTO TABLE test_parquet_partitioned_cache_test
         |PARTITION (`date`='2015-04-02')
         |select a, b from jt
       """.stripMargin)
-    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+    assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
 
     // Make sure we can cache the partitioned table.
     table("test_parquet_partitioned_cache_test")
@@ -526,7 +526,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
         """.stripMargin).collect())
 
     invalidateTable("test_parquet_partitioned_cache_test")
-    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+    assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
 
     dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test")
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index 33c1bb059e2fedc0b982e4271a14ff2c78f10b1e..a3e7737a7c05944903fe330a38915a1a31532a30 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -70,7 +70,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
 
   def tableDir: File = {
     val identifier = hiveContext.sessionState.sqlParser.parseTableIdentifier("bucketed_table")
-    new File(URI.create(hiveContext.catalog.hiveDefaultTableFilePath(identifier)))
+    new File(URI.create(hiveContext.sessionState.catalog.hiveDefaultTableFilePath(identifier)))
   }
 
   /**