diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 3aacce7d7f383cc0ac5ff0a712a0f25d8f33aec9..2e85e3676747a8b779d8897535e41d7fe74f2dd5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -402,7 +402,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
       Project(inputDataCols ++ inputPartCols, df.logicalPlan)
     }.getOrElse(df.logicalPlan)
 
-    df.sparkSession.executePlan(
+    df.sparkSession.sessionState.executePlan(
       InsertIntoTable(
         UnresolvedRelation(tableIdent),
         partitions.getOrElse(Map.empty[String, Option[String]]),
@@ -524,7 +524,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
             mode,
             extraOptions.toMap,
             df.logicalPlan)
-        df.sparkSession.executePlan(cmd).toRdd
+        df.sparkSession.sessionState.executePlan(cmd).toRdd
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index e5140fcf1337ec78a9103e895b2c2d650f7be920..961ae32b0b8816d98193e2827e01400c65e74db9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -58,7 +58,7 @@ private[sql] object Dataset {
   }
 
   def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
-    val qe = sparkSession.executePlan(logicalPlan)
+    val qe = sparkSession.sessionState.executePlan(logicalPlan)
     qe.assertAnalyzed()
     new Dataset[Row](sparkSession, logicalPlan, RowEncoder(qe.analyzed.schema))
   }
@@ -165,14 +165,14 @@ class Dataset[T] private[sql](
   // you wrap it with `withNewExecutionId` if this actions doesn't call other action.
 
   def this(sparkSession: SparkSession, logicalPlan: LogicalPlan, encoder: Encoder[T]) = {
-    this(sparkSession, sparkSession.executePlan(logicalPlan), encoder)
+    this(sparkSession, sparkSession.sessionState.executePlan(logicalPlan), encoder)
   }
 
   def this(sqlContext: SQLContext, logicalPlan: LogicalPlan, encoder: Encoder[T]) = {
     this(sqlContext.sparkSession, logicalPlan, encoder)
   }
 
-  @transient protected[sql] val logicalPlan: LogicalPlan = {
+  @transient private[sql] val logicalPlan: LogicalPlan = {
     def hasSideEffects(plan: LogicalPlan): Boolean = plan match {
       case _: Command |
            _: InsertIntoTable |
@@ -215,7 +215,7 @@ class Dataset[T] private[sql](
   // sqlContext must be val because a stable identifier is expected when you import implicits
   @transient lazy val sqlContext: SQLContext = sparkSession.sqlContext
 
-  protected[sql] def resolve(colName: String): NamedExpression = {
+  private[sql] def resolve(colName: String): NamedExpression = {
     queryExecution.analyzed.resolveQuoted(colName, sparkSession.sessionState.analyzer.resolver)
       .getOrElse {
         throw new AnalysisException(
@@ -223,7 +223,7 @@ class Dataset[T] private[sql](
       }
   }
 
-  protected[sql] def numericColumns: Seq[Expression] = {
+  private[sql] def numericColumns: Seq[Expression] = {
     schema.fields.filter(_.dataType.isInstanceOf[NumericType]).map { n =>
       queryExecution.analyzed.resolveQuoted(n.name, sparkSession.sessionState.analyzer.resolver).get
     }
@@ -417,7 +417,7 @@ class Dataset[T] private[sql](
    */
   def explain(extended: Boolean): Unit = {
     val explain = ExplainCommand(queryExecution.logical, extended = extended)
-    sparkSession.executePlan(explain).executedPlan.executeCollect().foreach {
+    sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect().foreach {
       // scalastyle:off println
       r => println(r.getString(0))
       // scalastyle:on println
@@ -641,7 +641,7 @@ class Dataset[T] private[sql](
   def join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame = {
     // Analyze the self join. The assumption is that the analyzer will disambiguate left vs right
     // by creating a new instance for one of the branch.
-    val joined = sparkSession.executePlan(
+    val joined = sparkSession.sessionState.executePlan(
       Join(logicalPlan, right.logicalPlan, joinType = JoinType(joinType), None))
       .analyzed.asInstanceOf[Join]
 
@@ -757,7 +757,7 @@ class Dataset[T] private[sql](
     val left = this.logicalPlan
     val right = other.logicalPlan
 
-    val joined = sparkSession.executePlan(Join(left, right, joinType =
+    val joined = sparkSession.sessionState.executePlan(Join(left, right, joinType =
       JoinType(joinType), Some(condition.expr)))
     val leftOutput = joined.analyzed.output.take(left.output.length)
     val rightOutput = joined.analyzed.output.takeRight(right.output.length)
@@ -1263,7 +1263,7 @@ class Dataset[T] private[sql](
   def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T] = {
     val inputPlan = logicalPlan
     val withGroupingKey = AppendColumns(func, inputPlan)
-    val executed = sparkSession.executePlan(withGroupingKey)
+    val executed = sparkSession.sessionState.executePlan(withGroupingKey)
 
     new KeyValueGroupedDataset(
       encoderFor[K],
@@ -2238,7 +2238,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def persist(): this.type = {
-    sparkSession.cacheManager.cacheQuery(this)
+    sparkSession.sharedState.cacheManager.cacheQuery(this)
     this
   }
 
@@ -2260,7 +2260,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def persist(newLevel: StorageLevel): this.type = {
-    sparkSession.cacheManager.cacheQuery(this, None, newLevel)
+    sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
     this
   }
 
@@ -2273,7 +2273,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def unpersist(blocking: Boolean): this.type = {
-    sparkSession.cacheManager.tryUncacheQuery(this, blocking)
+    sparkSession.sharedState.cacheManager.tryUncacheQuery(this, blocking)
     this
   }
 
@@ -2294,7 +2294,7 @@ class Dataset[T] private[sql](
   lazy val rdd: RDD[T] = {
     val objectType = unresolvedTEncoder.deserializer.dataType
     val deserialized = CatalystSerde.deserialize[T](logicalPlan)
-    sparkSession.executePlan(deserialized).toRdd.mapPartitions { rows =>
+    sparkSession.sessionState.executePlan(deserialized).toRdd.mapPartitions { rows =>
       rows.map(_.get(0, objectType).asInstanceOf[T])
     }
   }
@@ -2417,19 +2417,19 @@ class Dataset[T] private[sql](
   /**
    * Converts a JavaRDD to a PythonRDD.
    */
-  protected[sql] def javaToPython: JavaRDD[Array[Byte]] = {
+  private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
     val structType = schema  // capture it for closure
     val rdd = queryExecution.toRdd.map(EvaluatePython.toJava(_, structType))
     EvaluatePython.javaToPython(rdd)
   }
 
-  protected[sql] def collectToPython(): Int = {
+  private[sql] def collectToPython(): Int = {
     withNewExecutionId {
       PythonRDD.collectAndServe(javaToPython.rdd)
     }
   }
 
-  protected[sql] def toPythonIterator(): Int = {
+  private[sql] def toPythonIterator(): Int = {
     withNewExecutionId {
       PythonRDD.toLocalIteratorAndServe(javaToPython.rdd)
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 7013e316ead8380e79b6c30e0a9230c063f0d2ed..b17fb8a839487a81cadcd083a99a10667b903b5d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -83,12 +83,9 @@ class SQLContext private[sql](
 
   // TODO: move this logic into SparkSession
 
-  protected[sql] def sessionState: SessionState = sparkSession.sessionState
-  protected[sql] def sharedState: SharedState = sparkSession.sharedState
-  protected[sql] def conf: SQLConf = sessionState.conf
-  protected[sql] def runtimeConf: RuntimeConfig = sparkSession.conf
-  protected[sql] def cacheManager: CacheManager = sparkSession.cacheManager
-  protected[sql] def externalCatalog: ExternalCatalog = sparkSession.externalCatalog
+  private[sql] def sessionState: SessionState = sparkSession.sessionState
+  private[sql] def sharedState: SharedState = sparkSession.sharedState
+  private[sql] def conf: SQLConf = sessionState.conf
 
   def sparkContext: SparkContext = sparkSession.sparkContext
 
@@ -167,14 +164,6 @@ class SQLContext private[sql](
     sparkSession.conf.getAll
   }
 
-  protected[sql] def parseSql(sql: String): LogicalPlan = sparkSession.parseSql(sql)
-
-  protected[sql] def executeSql(sql: String): QueryExecution = sparkSession.executeSql(sql)
-
-  protected[sql] def executePlan(plan: LogicalPlan): QueryExecution = {
-    sparkSession.executePlan(plan)
-  }
-
   /**
    * :: Experimental ::
    * A collection of methods that are considered experimental, but can be used to hook into
@@ -240,15 +229,6 @@ class SQLContext private[sql](
     sparkSession.catalog.isCached(tableName)
   }
 
-  /**
-   * Returns true if the [[Dataset]] is currently cached in-memory.
-   * @group cachemgmt
-   * @since 1.3.0
-   */
-  private[sql] def isCached(qName: Dataset[_]): Boolean = {
-    sparkSession.cacheManager.lookupCachedData(qName).nonEmpty
-  }
-
   /**
    * Caches the specified table in-memory.
    * @group cachemgmt
@@ -718,26 +698,9 @@ class SQLContext private[sql](
    * have the same format as the one generated by `toString` in scala.
    * It is only used by PySpark.
    */
-  protected[sql] def parseDataType(dataTypeString: String): DataType = {
-    sparkSession.parseDataType(dataTypeString)
-  }
-
-  /**
-   * Apply a schema defined by the schemaString to an RDD. It is only used by PySpark.
-   */
-  protected[sql] def applySchemaToPythonRDD(
-      rdd: RDD[Array[Any]],
-      schemaString: String): DataFrame = {
-    sparkSession.applySchemaToPythonRDD(rdd, schemaString)
-  }
-
-  /**
-   * Apply a schema defined by the schema to an RDD. It is only used by PySpark.
-   */
-  protected[sql] def applySchemaToPythonRDD(
-      rdd: RDD[Array[Any]],
-      schema: StructType): DataFrame = {
-    sparkSession.applySchemaToPythonRDD(rdd, schema)
+  // TODO: Remove this function (would require updating PySpark).
+  private[sql] def parseDataType(dataTypeString: String): DataType = {
+    DataType.fromJson(dataTypeString)
   }
 }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 86c97b92dfc3dd79c1b1fc7c7303602501c18186..a36368afe22a7e2a2a806f0aff7983df666cbb0c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -34,10 +34,9 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.catalog.Catalog
 import org.apache.spark.sql.catalyst._
-import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.ui.SQLListener
@@ -51,7 +50,14 @@ import org.apache.spark.util.Utils
 /**
  * The entry point to programming Spark with the Dataset and DataFrame API.
  *
- * To create a SparkSession, use the following builder pattern:
+ * In environments that this has been created upfront (e.g. REPL, notebooks), use the builder
+ * to get an existing session:
+ *
+ * {{{
+ *   SparkSession.builder().getOrCreate()
+ * }}}
+ *
+ * The builder can also be used to create a new session:
  *
  * {{{
  *   SparkSession.builder()
@@ -81,7 +87,7 @@ class SparkSession private(
    * and a catalog that interacts with external systems.
    */
   @transient
-  protected[sql] lazy val sharedState: SharedState = {
+  private[sql] lazy val sharedState: SharedState = {
     existingSharedState.getOrElse(
       SparkSession.reflect[SharedState, SparkContext](
         SparkSession.sharedStateClassName(sparkContext.conf),
@@ -93,7 +99,7 @@ class SparkSession private(
    * functions, and everything else that accepts a [[org.apache.spark.sql.internal.SQLConf]].
    */
   @transient
-  protected[sql] lazy val sessionState: SessionState = {
+  private[sql] lazy val sessionState: SessionState = {
     SparkSession.reflect[SessionState, SparkSession](
       SparkSession.sessionStateClassName(sparkContext.conf),
       self)
@@ -105,10 +111,6 @@ class SparkSession private(
   @transient
   private[sql] val sqlContext: SQLContext = new SQLContext(this)
 
-  protected[sql] def cacheManager: CacheManager = sharedState.cacheManager
-  protected[sql] def listener: SQLListener = sharedState.listener
-  protected[sql] def externalCatalog: ExternalCatalog = sharedState.externalCatalog
-
   /**
    * Runtime configuration interface for Spark.
    *
@@ -178,12 +180,14 @@ class SparkSession private(
   def udf: UDFRegistration = sessionState.udf
 
   /**
+   * :: Experimental ::
    * Returns a [[ContinuousQueryManager]] that allows managing all the
    * [[org.apache.spark.sql.ContinuousQuery ContinuousQueries]] active on `this`.
    *
    * @group basic
    * @since 2.0.0
    */
+  @Experimental
   def streams: ContinuousQueryManager = sessionState.continuousQueryManager
 
   /**
@@ -208,13 +212,11 @@ class SparkSession private(
    * --------------------------------- */
 
   /**
-   * :: Experimental ::
    * Returns a [[DataFrame]] with no rows or columns.
    *
    * @group dataframes
    * @since 2.0.0
    */
-  @Experimental
   @transient
   lazy val emptyDataFrame: DataFrame = {
     createDataFrame(sparkContext.emptyRDD[Row], StructType(Nil))
@@ -449,7 +451,7 @@ class SparkSession private(
    * Creates a [[DataFrame]] from an RDD[Row].
    * User can specify whether the input rows should be converted to Catalyst rows.
    */
-  protected[sql] def internalCreateDataFrame(
+  private[sql] def internalCreateDataFrame(
       catalystRows: RDD[InternalRow],
       schema: StructType): DataFrame = {
     // TODO: use MutableProjection when rowRDD is another DataFrame and the applied
@@ -462,7 +464,7 @@ class SparkSession private(
    * Creates a [[DataFrame]] from an RDD[Row].
    * User can specify whether the input rows should be converted to Catalyst rows.
    */
-  protected[sql] def createDataFrame(
+  private[sql] def createDataFrame(
       rowRDD: RDD[Row],
       schema: StructType,
       needsConversion: Boolean) = {
@@ -502,7 +504,7 @@ class SparkSession private(
     table(sessionState.sqlParser.parseTableIdentifier(tableName))
   }
 
-  protected[sql] def table(tableIdent: TableIdentifier): DataFrame = {
+  private[sql] def table(tableIdent: TableIdentifier): DataFrame = {
     Dataset.ofRows(self, sessionState.catalog.lookupRelation(tableIdent))
   }
 
@@ -510,7 +512,7 @@ class SparkSession private(
    * Creates a temporary view with a DataFrame. The lifetime of this temporary view is tied to
    * this [[SparkSession]].
    */
-  protected[sql] def createTempView(
+  private[sql] def createTempView(
       viewName: String, df: DataFrame, replaceIfExists: Boolean) = {
     sessionState.catalog.createTempView(
       sessionState.sqlParser.parseTableIdentifier(viewName).table,
@@ -529,11 +531,10 @@ class SparkSession private(
    * @since 2.0.0
    */
   def sql(sqlText: String): DataFrame = {
-    Dataset.ofRows(self, parseSql(sqlText))
+    Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
   }
 
   /**
-   * :: Experimental ::
    * Returns a [[DataFrameReader]] that can be used to read data and streams in as a [[DataFrame]].
    * {{{
    *   sparkSession.read.parquet("/path/to/file.parquet")
@@ -543,7 +544,6 @@ class SparkSession private(
    * @group genericdata
    * @since 2.0.0
    */
-  @Experimental
   def read: DataFrameReader = new DataFrameReader(self)
 
 
@@ -577,18 +577,6 @@ class SparkSession private(
     sparkContext.stop()
   }
 
-  protected[sql] def parseSql(sql: String): LogicalPlan = {
-    sessionState.sqlParser.parsePlan(sql)
-  }
-
-  protected[sql] def executeSql(sql: String): QueryExecution = {
-    executePlan(parseSql(sql))
-  }
-
-  protected[sql] def executePlan(plan: LogicalPlan): QueryExecution = {
-    sessionState.executePlan(plan)
-  }
-
   /**
    * Parses the data type in our internal string representation. The data type string should
    * have the same format as the one generated by `toString` in scala.
@@ -601,17 +589,17 @@ class SparkSession private(
   /**
    * Apply a schema defined by the schemaString to an RDD. It is only used by PySpark.
    */
-  protected[sql] def applySchemaToPythonRDD(
+  private[sql] def applySchemaToPythonRDD(
       rdd: RDD[Array[Any]],
       schemaString: String): DataFrame = {
-    val schema = parseDataType(schemaString).asInstanceOf[StructType]
+    val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
     applySchemaToPythonRDD(rdd, schema)
   }
 
   /**
    * Apply a schema defined by the schema to an RDD. It is only used by PySpark.
    */
-  protected[sql] def applySchemaToPythonRDD(
+  private[sql] def applySchemaToPythonRDD(
       rdd: RDD[Array[Any]],
       schema: StructType): DataFrame = {
     val rowRdd = rdd.map(r => python.EvaluatePython.fromJava(r, schema).asInstanceOf[InternalRow])
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index f601138a9d72b60f3a369f9b96bc29a4402befdf..c8bdb0d22c9f8e644fe2fdd8074fcb396c350fd0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -95,7 +95,7 @@ private[sql] class CacheManager extends Logging {
             sparkSession.sessionState.conf.useCompression,
             sparkSession.sessionState.conf.columnBatchSize,
             storageLevel,
-            sparkSession.executePlan(planToCache).executedPlan,
+            sparkSession.sessionState.executePlan(planToCache).executedPlan,
             tableName))
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 34187b9a1ae7f731ed0adacc0b5a995347c19f30..330459c11ea9823f61b4b46a6af55707566f4fa6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -67,7 +67,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
   lazy val withCachedData: LogicalPlan = {
     assertAnalyzed()
     assertSupported()
-    sparkSession.cacheManager.useCachedData(analyzed)
+    sparkSession.sharedState.cacheManager.useCachedData(analyzed)
   }
 
   lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index d5aaccc4bdd90a28da8a8b974aaaf24535f0b7f5..642a95a99262d6f227dcd9e5d2ef35ab9422b93d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -98,7 +98,7 @@ case class ExplainCommand(
 
   // Run through the optimizer to generate the physical plan.
   override def run(sparkSession: SparkSession): Seq[Row] = try {
-    val queryExecution = sparkSession.executePlan(logicalPlan)
+    val queryExecution = sparkSession.sessionState.executePlan(logicalPlan)
     val outputString =
       if (codegen) {
         codegenString(queryExecution.executedPlan)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index ffea6285528a346c28f3d68a75ea66f20e19bce8..7ce7bb903de46ca3f859365876d6117805178e35 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -200,7 +200,8 @@ case class DropTableCommand(
         case _ =>
       })
       try {
-        sparkSession.cacheManager.tryUncacheQuery(sparkSession.table(tableName.quotedString))
+        sparkSession.sharedState.cacheManager.tryUncacheQuery(
+          sparkSession.table(tableName.quotedString))
       } catch {
         case NonFatal(e) => log.warn(e.toString, e)
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 075849afde08c1c0d6300bf9ea44a61d764cc6fb..84990119c9c0fc8a988fe844586886b65e4ca199 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -79,7 +79,7 @@ case class CreateViewCommand(
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     // If the plan cannot be analyzed, throw an exception and don't proceed.
-    val qe = sparkSession.executePlan(child)
+    val qe = sparkSession.sessionState.executePlan(child)
     qe.assertAnalyzed()
     val analyzedPlan = qe.analyzed
 
@@ -132,7 +132,7 @@ case class CreateViewCommand(
         val projectList = analyzedPlan.output.zip(tableDesc.schema).map {
           case (attr, col) => Alias(attr, col.name)()
         }
-        sparkSession.executePlan(Project(projectList, analyzedPlan)).analyzed
+        sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
       }
     }
 
@@ -153,7 +153,7 @@ case class CreateViewCommand(
             val projectList = analyzedPlan.output.zip(tableDesc.schema).map {
               case (attr, col) => Alias(attr, col.name)()
             }
-            sparkSession.executePlan(Project(projectList, analyzedPlan)).analyzed
+            sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
           }
         new SQLBuilder(logicalPlan).toSQL
       } else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index dfe06478fccb2fd89127f436aab4d31ba324f57d..b3beb6c85f8ed4db3f5047424b9f3c6ca9f47d01 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -476,7 +476,7 @@ case class DataSource(
             options,
             data.logicalPlan,
             mode)
-        sparkSession.executePlan(plan).toRdd
+        sparkSession.sessionState.executePlan(plan).toRdd
 
       case _ =>
         sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
index c3e07f7d00557afe26a9ae070f4feae4f7b3fc03..25b901f2db8d0ce05dbb020cd57370da00a0002e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
@@ -40,7 +40,7 @@ private[sql] case class InsertIntoDataSourceCommand(
     relation.insert(df, overwrite)
 
     // Invalidate the cache.
-    sparkSession.cacheManager.invalidateCache(logicalRelation)
+    sparkSession.sharedState.cacheManager.invalidateCache(logicalRelation)
 
     Seq.empty[Row]
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 1371abe189f84bfd7169aad7a5fd4f9732548edb..f3f36efda599b7c4a3b14947f74c3e510f34b61a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -230,7 +230,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
         bucketSpec = None,
         allowExisting = false,
         managedIfNoPath = false)
-    sparkSession.executePlan(cmd).toRdd
+    sparkSession.sessionState.executePlan(cmd).toRdd
     sparkSession.table(tableIdent)
   }
 
@@ -278,7 +278,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
         bucketSpec = None,
         allowExisting = false,
         managedIfNoPath = false)
-    sparkSession.executePlan(cmd).toRdd
+    sparkSession.sessionState.executePlan(cmd).toRdd
     sparkSession.table(tableIdent)
   }
 
@@ -291,7 +291,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    * @since 2.0.0
    */
   override def dropTempView(viewName: String): Unit = {
-    sparkSession.cacheManager.tryUncacheQuery(sparkSession.table(viewName))
+    sparkSession.sharedState.cacheManager.tryUncacheQuery(sparkSession.table(viewName))
     sessionCatalog.dropTable(TableIdentifier(viewName), ignoreIfNotExists = true)
   }
 
@@ -302,7 +302,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    * @since 2.0.0
    */
   override def isCached(tableName: String): Boolean = {
-    sparkSession.cacheManager.lookupCachedData(sparkSession.table(tableName)).nonEmpty
+    sparkSession.sharedState.cacheManager.lookupCachedData(sparkSession.table(tableName)).nonEmpty
   }
 
   /**
@@ -312,7 +312,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    * @since 2.0.0
    */
   override def cacheTable(tableName: String): Unit = {
-    sparkSession.cacheManager.cacheQuery(sparkSession.table(tableName), Some(tableName))
+    sparkSession.sharedState.cacheManager.cacheQuery(sparkSession.table(tableName), Some(tableName))
   }
 
   /**
@@ -322,7 +322,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    * @since 2.0.0
    */
   override def uncacheTable(tableName: String): Unit = {
-    sparkSession.cacheManager.uncacheQuery(sparkSession.table(tableName))
+    sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName))
   }
 
   /**
@@ -332,7 +332,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    * @since 2.0.0
    */
   override def clearCache(): Unit = {
-    sparkSession.cacheManager.clearCache()
+    sparkSession.sharedState.cacheManager.clearCache()
   }
 
   /**
@@ -342,7 +342,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    * @since 2.0.0
    */
   protected[sql] def isCached(qName: Dataset[_]): Boolean = {
-    sparkSession.cacheManager.lookupCachedData(qName).nonEmpty
+    sparkSession.sharedState.cacheManager.lookupCachedData(qName).nonEmpty
   }
 
   /**
@@ -360,15 +360,15 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
     // cached version and make the new version cached lazily.
     val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent)
     // Use lookupCachedData directly since RefreshTable also takes databaseName.
-    val isCached = sparkSession.cacheManager.lookupCachedData(logicalPlan).nonEmpty
+    val isCached = sparkSession.sharedState.cacheManager.lookupCachedData(logicalPlan).nonEmpty
     if (isCached) {
       // Create a data frame to represent the table.
       // TODO: Use uncacheTable once it supports database name.
       val df = Dataset.ofRows(sparkSession, logicalPlan)
       // Uncache the logicalPlan.
-      sparkSession.cacheManager.tryUncacheQuery(df, blocking = true)
+      sparkSession.sharedState.cacheManager.tryUncacheQuery(df, blocking = true)
       // Cache it again.
-      sparkSession.cacheManager.cacheQuery(df, Some(tableIdent.table))
+      sparkSession.sharedState.cacheManager.cacheQuery(df, Some(tableIdent.table))
     }
   }
 
@@ -383,7 +383,7 @@ private[sql] object CatalogImpl {
     val enc = ExpressionEncoder[T]()
     val encoded = data.map(d => enc.toRow(d).copy())
     val plan = new LocalRelation(enc.schema.toAttributes, encoded)
-    val queryExecution = sparkSession.executePlan(plan)
+    val queryExecution = sparkSession.sessionState.executePlan(plan)
     new Dataset[T](sparkSession, queryExecution, enc)
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index c9cc2ba04a4131878e25b98bef8f95081c8c55f4..4c7bbf04bc72a5d1964005d8e37e428884b6eeec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -92,7 +92,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
    * Internal catalog for managing table and database states.
    */
   lazy val catalog = new SessionCatalog(
-    sparkSession.externalCatalog,
+    sparkSession.sharedState.externalCatalog,
     functionResourceLoader,
     functionRegistry,
     conf,
@@ -161,6 +161,8 @@ private[sql] class SessionState(sparkSession: SparkSession) {
   //  Helper methods, partially leftover from pre-2.0 days
   // ------------------------------------------------------
 
+  def executeSql(sql: String): QueryExecution = executePlan(sqlParser.parsePlan(sql))
+
   def executePlan(plan: LogicalPlan): QueryExecution = new QueryExecution(sparkSession, plan)
 
   def invalidateTable(tableName: String): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 1c96bdc05cfcd27ca34bb698abdd936d9e57cf64..e08a9ab7e6914d8cc42fbdb1fde6b28eef8fff4d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -79,17 +79,17 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
   }
 
   test("unpersist an uncached table will not raise exception") {
-    assert(None == spark.cacheManager.lookupCachedData(testData))
+    assert(None == spark.sharedState.cacheManager.lookupCachedData(testData))
     testData.unpersist(blocking = true)
-    assert(None == spark.cacheManager.lookupCachedData(testData))
+    assert(None == spark.sharedState.cacheManager.lookupCachedData(testData))
     testData.unpersist(blocking = false)
-    assert(None == spark.cacheManager.lookupCachedData(testData))
+    assert(None == spark.sharedState.cacheManager.lookupCachedData(testData))
     testData.persist()
-    assert(None != spark.cacheManager.lookupCachedData(testData))
+    assert(None != spark.sharedState.cacheManager.lookupCachedData(testData))
     testData.unpersist(blocking = true)
-    assert(None == spark.cacheManager.lookupCachedData(testData))
+    assert(None == spark.sharedState.cacheManager.lookupCachedData(testData))
     testData.unpersist(blocking = false)
-    assert(None == spark.cacheManager.lookupCachedData(testData))
+    assert(None == spark.sharedState.cacheManager.lookupCachedData(testData))
   }
 
   test("cache table as select") {
@@ -311,14 +311,14 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
     spark.catalog.cacheTable("t1")
     spark.catalog.cacheTable("t2")
     spark.catalog.clearCache()
-    assert(spark.cacheManager.isEmpty)
+    assert(spark.sharedState.cacheManager.isEmpty)
 
     sql("SELECT key FROM testData LIMIT 10").createOrReplaceTempView("t1")
     sql("SELECT key FROM testData LIMIT 5").createOrReplaceTempView("t2")
     spark.catalog.cacheTable("t1")
     spark.catalog.cacheTable("t2")
     sql("Clear CACHE")
-    assert(spark.cacheManager.isEmpty)
+    assert(spark.sharedState.cacheManager.isEmpty)
   }
 
   test("Clear accumulators when uncacheTable to prevent memory leaking") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
index fa8fa0690721bd1da4ef706e8f3168216eb6ddda..d5cb5e15688e880b6e7bc418b42462daf8d94e9d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
@@ -104,7 +104,7 @@ class DataFramePivotSuite extends QueryTest with SharedSQLContext{
       // pivot with extra columns to trigger optimization
       .pivot("course", Seq("dotNET", "Java") ++ (1 to 10).map(_.toString))
       .agg(sum($"earnings"))
-    val queryExecution = spark.executePlan(df.queryExecution.logical)
+    val queryExecution = spark.sessionState.executePlan(df.queryExecution.logical)
     assert(queryExecution.simpleString.contains("pivotfirst"))
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
index 8c0906b74600dbaac3ad0c56a9718600eb6c16eb..ac9f6c2f38537a5967853289c5e430716a2057a4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
@@ -39,7 +39,8 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext {
       2, 3, 4)
     // Drop the cache.
     cached.unpersist()
-    assert(spark.cacheManager.lookupCachedData(cached).isEmpty, "The Dataset should not be cached.")
+    assert(spark.sharedState.cacheManager.lookupCachedData(cached).isEmpty,
+      "The Dataset should not be cached.")
   }
 
   test("persist and then rebind right encoder when join 2 datasets") {
@@ -56,10 +57,10 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext {
     assertCached(joined, 2)
 
     ds1.unpersist()
-    assert(spark.cacheManager.lookupCachedData(ds1).isEmpty,
+    assert(spark.sharedState.cacheManager.lookupCachedData(ds1).isEmpty,
       "The Dataset ds1 should not be cached.")
     ds2.unpersist()
-    assert(spark.cacheManager.lookupCachedData(ds2).isEmpty,
+    assert(spark.sharedState.cacheManager.lookupCachedData(ds2).isEmpty,
       "The Dataset ds2 should not be cached.")
   }
 
@@ -75,9 +76,10 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext {
     assertCached(agged.filter(_._1 == "b"))
 
     ds.unpersist()
-    assert(spark.cacheManager.lookupCachedData(ds).isEmpty, "The Dataset ds should not be cached.")
+    assert(spark.sharedState.cacheManager.lookupCachedData(ds).isEmpty,
+      "The Dataset ds should not be cached.")
     agged.unpersist()
-    assert(spark.cacheManager.lookupCachedData(agged).isEmpty,
+    assert(spark.sharedState.cacheManager.lookupCachedData(agged).isEmpty,
       "The Dataset agged should not be cached.")
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 55836737083de658a527316f180525b7b38c3fd5..cbf4a8a6125940a2d5578471f7a5a5c037acccb5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -60,7 +60,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
   }
 
   test("join operator selection") {
-    spark.cacheManager.clearCache()
+    spark.sharedState.cacheManager.clearCache()
 
     withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "0",
       SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
@@ -113,7 +113,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
 //  }
 
   test("broadcasted hash join operator selection") {
-    spark.cacheManager.clearCache()
+    spark.sharedState.cacheManager.clearCache()
     sql("CACHE TABLE testData")
     Seq(
       ("SELECT * FROM testData join testData2 ON key = a",
@@ -127,7 +127,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
   }
 
   test("broadcasted hash outer join operator selection") {
-    spark.cacheManager.clearCache()
+    spark.sharedState.cacheManager.clearCache()
     sql("CACHE TABLE testData")
     sql("CACHE TABLE testData2")
     Seq(
@@ -450,7 +450,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
   }
 
   test("broadcasted existence join operator selection") {
-    spark.cacheManager.clearCache()
+    spark.sharedState.cacheManager.clearCache()
     sql("CACHE TABLE testData")
 
     withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index e2fb91352d58e6969de3c15733a04e5703e1c7ea..af3ed14c122d2ec05478d525bde775b93bfc835c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -33,7 +33,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
   setupTestData()
 
   test("simple columnar query") {
-    val plan = spark.executePlan(testData.logicalPlan).sparkPlan
+    val plan = spark.sessionState.executePlan(testData.logicalPlan).sparkPlan
     val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
 
     checkAnswer(scan, testData.collect().toSeq)
@@ -50,7 +50,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
   }
 
   test("projection") {
-    val plan = spark.executePlan(testData.select('value, 'key).logicalPlan).sparkPlan
+    val plan = spark.sessionState.executePlan(testData.select('value, 'key).logicalPlan).sparkPlan
     val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
 
     checkAnswer(scan, testData.collect().map {
@@ -59,7 +59,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
   }
 
   test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
-    val plan = spark.executePlan(testData.logicalPlan).sparkPlan
+    val plan = spark.sessionState.executePlan(testData.logicalPlan).sparkPlan
     val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
 
     checkAnswer(scan, testData.collect().toSeq)
@@ -202,7 +202,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
     cached.count()
 
     // Make sure, the DataFrame is indeed cached.
-    assert(spark.cacheManager.lookupCachedData(cached).nonEmpty)
+    assert(spark.sharedState.cacheManager.lookupCachedData(cached).nonEmpty)
 
     // Check result.
     checkAnswer(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 12940c86fe6506f7577e95b80cb1ebf01b832bfc..7e9160febdec7927bba124a5277a50c284087182 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -71,21 +71,22 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
       df: DataFrame,
       expectedNumOfJobs: Int,
       expectedMetrics: Map[Long, (String, Map[String, Any])]): Unit = {
-    val previousExecutionIds = spark.listener.executionIdToData.keySet
+    val previousExecutionIds = spark.sharedState.listener.executionIdToData.keySet
     withSQLConf("spark.sql.codegen.wholeStage" -> "false") {
       df.collect()
     }
     sparkContext.listenerBus.waitUntilEmpty(10000)
-    val executionIds = spark.listener.executionIdToData.keySet.diff(previousExecutionIds)
+    val executionIds =
+      spark.sharedState.listener.executionIdToData.keySet.diff(previousExecutionIds)
     assert(executionIds.size === 1)
     val executionId = executionIds.head
-    val jobs = spark.listener.getExecution(executionId).get.jobs
+    val jobs = spark.sharedState.listener.getExecution(executionId).get.jobs
     // Use "<=" because there is a race condition that we may miss some jobs
     // TODO Change it to "=" once we fix the race condition that missing the JobStarted event.
     assert(jobs.size <= expectedNumOfJobs)
     if (jobs.size == expectedNumOfJobs) {
       // If we can track all jobs, check the metric values
-      val metricValues = spark.listener.getExecutionMetrics(executionId)
+      val metricValues = spark.sharedState.listener.getExecutionMetrics(executionId)
       val actualMetrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan(
         df.queryExecution.executedPlan)).allNodes.filter { node =>
         expectedMetrics.contains(node.id)
@@ -283,19 +284,20 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
 
   test("save metrics") {
     withTempPath { file =>
-      val previousExecutionIds = spark.listener.executionIdToData.keySet
+      val previousExecutionIds = spark.sharedState.listener.executionIdToData.keySet
       // Assume the execution plan is
       // PhysicalRDD(nodeId = 0)
       person.select('name).write.format("json").save(file.getAbsolutePath)
       sparkContext.listenerBus.waitUntilEmpty(10000)
-      val executionIds = spark.listener.executionIdToData.keySet.diff(previousExecutionIds)
+      val executionIds =
+        spark.sharedState.listener.executionIdToData.keySet.diff(previousExecutionIds)
       assert(executionIds.size === 1)
       val executionId = executionIds.head
-      val jobs = spark.listener.getExecution(executionId).get.jobs
+      val jobs = spark.sharedState.listener.getExecution(executionId).get.jobs
       // Use "<=" because there is a race condition that we may miss some jobs
       // TODO Change "<=" to "=" once we fix the race condition that missing the JobStarted event.
       assert(jobs.size <= 1)
-      val metricValues = spark.listener.getExecutionMetrics(executionId)
+      val metricValues = spark.sharedState.listener.getExecutionMetrics(executionId)
       // Because "save" will create a new DataFrame internally, we cannot get the real metric id.
       // However, we still can check the value.
       assert(metricValues.values.toSeq.exists(_ === "2"))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 2374ffaaa5036b605ad31947943613c8326a78fb..cf7e976acc65f1c2e0b3b213634f348dd38e67c0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -340,16 +340,16 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
   }
 
   test("SPARK-11126: no memory leak when running non SQL jobs") {
-    val previousStageNumber = spark.listener.stageIdToStageMetrics.size
+    val previousStageNumber = spark.sharedState.listener.stageIdToStageMetrics.size
     spark.sparkContext.parallelize(1 to 10).foreach(i => ())
     spark.sparkContext.listenerBus.waitUntilEmpty(10000)
     // listener should ignore the non SQL stage
-    assert(spark.listener.stageIdToStageMetrics.size == previousStageNumber)
+    assert(spark.sharedState.listener.stageIdToStageMetrics.size == previousStageNumber)
 
     spark.sparkContext.parallelize(1 to 10).toDF().foreach(i => ())
     spark.sparkContext.listenerBus.waitUntilEmpty(10000)
     // listener should save the SQL stage
-    assert(spark.listener.stageIdToStageMetrics.size == previousStageNumber + 1)
+    assert(spark.sharedState.listener.stageIdToStageMetrics.size == previousStageNumber + 1)
   }
 
   test("SPARK-13055: history listener only tracks SQL metrics") {
@@ -418,12 +418,12 @@ class SQLListenerMemoryLeakSuite extends SparkFunSuite {
           }
         }
         sc.listenerBus.waitUntilEmpty(10000)
-        assert(spark.listener.getCompletedExecutions.size <= 50)
-        assert(spark.listener.getFailedExecutions.size <= 50)
+        assert(spark.sharedState.listener.getCompletedExecutions.size <= 50)
+        assert(spark.sharedState.listener.getFailedExecutions.size <= 50)
         // 50 for successful executions and 50 for failed executions
-        assert(spark.listener.executionIdToData.size <= 100)
-        assert(spark.listener.jobIdToExecutionId.size <= 100)
-        assert(spark.listener.stageIdToStageMetrics.size <= 100)
+        assert(spark.sharedState.listener.executionIdToData.size <= 100)
+        assert(spark.sharedState.listener.jobIdToExecutionId.size <= 100)
+        assert(spark.sharedState.listener.stageIdToStageMetrics.size <= 100)
       } finally {
         sc.stop()
       }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 9c9abfeb2a6372a8b700d22970ebe6b2478eed97..abb7918ae607b6069731e64fba65de09d0fac5b9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -639,7 +639,7 @@ class JDBCSuite extends SparkFunSuite
   test("test credentials in the properties are not in plan output") {
     val df = sql("SELECT * FROM parts")
     val explain = ExplainCommand(df.queryExecution.logical, extended = true)
-    spark.executePlan(explain).executedPlan.executeCollect().foreach {
+    spark.sessionState.executePlan(explain).executedPlan.executeCollect().foreach {
       r => assert(!List("testPass", "testUser").exists(r.toString.contains))
     }
     // test the JdbcRelation toString output
@@ -651,7 +651,7 @@ class JDBCSuite extends SparkFunSuite
   test("test credentials in the connection url are not in the plan output") {
     val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties)
     val explain = ExplainCommand(df.queryExecution.logical, extended = true)
-    spark.executePlan(explain).executedPlan.executeCollect().foreach {
+    spark.sessionState.executePlan(explain).executedPlan.executeCollect().foreach {
       r => assert(!List("testPass", "testUser").exists(r.toString.contains))
     }
   }
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
index c24e474d9ca461746e67978515953c23681d21d5..0d5dc7af5f5222577bb5eb44e5fa1c1bcf0ec694 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
@@ -59,7 +59,7 @@ private[hive] class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlCont
     // TODO unify the error code
     try {
       context.sparkContext.setJobDescription(command)
-      val execution = context.executePlan(context.sql(command).logicalPlan)
+      val execution = context.sessionState.executePlan(context.sql(command).logicalPlan)
       hiveResponse = execution.hiveResultString()
       tableSchema = getResultSetSchema(execution)
       new CommandProcessorResponse(0)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index b377a20e3943be505d31a56163b11d3e79df27f2..ea721e4d9b6ece0a02d6754635aaa09ca8ab364c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -177,8 +177,11 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
       alias match {
         // because hive use things like `_c0` to build the expanded text
         // currently we cannot support view from "create view v1(c1) as ..."
-        case None => SubqueryAlias(table.identifier.table, sparkSession.parseSql(viewText))
-        case Some(aliasText) => SubqueryAlias(aliasText, sparkSession.parseSql(viewText))
+        case None =>
+          SubqueryAlias(table.identifier.table,
+            sparkSession.sessionState.sqlParser.parsePlan(viewText))
+        case Some(aliasText) =>
+          SubqueryAlias(aliasText, sessionState.sqlParser.parsePlan(viewText))
       }
     } else {
       MetastoreRelation(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
index 3fc900961e645bcfd92bbf1a75f7c00d3b3697fc..cfe614909532b77b51de1b4f2e579c21b0e4f9b7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
@@ -86,7 +86,7 @@ case class CreateTableAsSelectCommand(
         throw new AnalysisException(s"$tableIdentifier already exists.")
       }
     } else {
-      sparkSession.executePlan(InsertIntoTable(
+      sparkSession.sessionState.executePlan(InsertIntoTable(
         metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd
     }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 3805674d39589d77aa20332e0e27fa34c6f1c9b1..9e8ff9317c1080b41f91c39312743194aa337e73 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -296,7 +296,7 @@ case class InsertIntoHiveTable(
     }
 
     // Invalidate the cache.
-    sqlContext.cacheManager.invalidateCache(table)
+    sqlContext.sharedState.cacheManager.invalidateCache(table)
 
     // It would be nice to just return the childRdd unchanged so insert operations could be chained,
     // however for now we return an empty list to simplify compatibility checks with hive, which
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
index 7c74a0308d4833a4857ed874b13a758c04672d99..dc8f374eb178f36e6b5ab277cc4b3e83b5206866 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
@@ -130,7 +130,7 @@ class ErrorPositionSuite extends QueryTest with TestHiveSingleton with BeforeAnd
    * @param token a unique token in the string that should be indicated by the exception
    */
   def positionTest(name: String, query: String, token: String): Unit = {
-    def ast = hiveContext.parseSql(query)
+    def ast = hiveContext.sessionState.sqlParser.parsePlan(query)
     def parseTree = Try(quietly(ast.treeString)).getOrElse("<failed to parse>")
 
     test(name) {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
index dedc8f55f01ba16dd13d5f5db14de18ca59dea34..f789d88d5dd4adec3d84f93e75a9ae22525c9c0a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
@@ -279,13 +279,13 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing
 
   private def checkCreateTableOrView(table: TableIdentifier, checkType: String): Unit = {
     val db = table.database.getOrElse("default")
-    val expected = spark.externalCatalog.getTable(db, table.table)
+    val expected = spark.sharedState.externalCatalog.getTable(db, table.table)
     val shownDDL = sql(s"SHOW CREATE TABLE ${table.quotedString}").head().getString(0)
     sql(s"DROP $checkType ${table.quotedString}")
 
     try {
       sql(shownDDL)
-      val actual = spark.externalCatalog.getTable(db, table.table)
+      val actual = spark.sharedState.externalCatalog.getTable(db, table.table)
       checkCatalogTables(expected, actual)
     } finally {
       sql(s"DROP $checkType IF EXISTS ${table.table}")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index f8e00a35a31e2fb35a117da66ca132006b97a02d..73b1a7850d6fb2ee821675b0adc263956017f268 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -34,7 +34,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
 
   test("parse analyze commands") {
     def assertAnalyzeCommand(analyzeCommand: String, c: Class[_]) {
-      val parsed = hiveContext.parseSql(analyzeCommand)
+      val parsed = hiveContext.sessionState.sqlParser.parsePlan(analyzeCommand)
       val operators = parsed.collect {
         case a: AnalyzeTableCommand => a
         case o => o
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala
index f5cd73d45ed75128289a12fb649dee7a3890619f..1583a448efaf763ed84b28d372b0687c03a20a6a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala
@@ -30,9 +30,9 @@ class ConcurrentHiveSuite extends SparkFunSuite with BeforeAndAfterAll {
         conf.set("spark.ui.enabled", "false")
         val ts =
           new TestHiveContext(new SparkContext("local", s"TestSQLContext$i", conf))
-        ts.executeSql("SHOW TABLES").toRdd.collect()
-        ts.executeSql("SELECT * FROM src").toRdd.collect()
-        ts.executeSql("SHOW TABLES").toRdd.collect()
+        ts.sessionState.executeSql("SHOW TABLES").toRdd.collect()
+        ts.sessionState.executeSql("SELECT * FROM src").toRdd.collect()
+        ts.sessionState.executeSql("SHOW TABLES").toRdd.collect()
       }
     }
   }