diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 9259ff40625c9c425f4a82e50dd7338f32ba0c8b..781d6998190b2a0edb17b42336aab5ae3a2dc08a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.ShowTablesCommand
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
-import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.ExecutionListenerManager
@@ -63,17 +63,14 @@ import org.apache.spark.util.Utils
  * @since 1.0.0
  */
 class SQLContext private[sql](
-    @transient val sparkContext: SparkContext,
-    @transient protected[sql] val cacheManager: CacheManager,
-    @transient private[sql] val listener: SQLListener,
-    val isRootContext: Boolean,
-    @transient private[sql] val externalCatalog: ExternalCatalog)
+    @transient protected[sql] val sharedState: SharedState,
+    val isRootContext: Boolean)
   extends Logging with Serializable {
 
   self =>
 
   def this(sc: SparkContext) = {
-    this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), true, new InMemoryCatalog)
+    this(new SharedState(sc), true)
   }
 
   def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
@@ -100,20 +97,20 @@ class SQLContext private[sql](
     }
   }
 
+  def sparkContext: SparkContext = sharedState.sparkContext
+
+  protected[sql] def cacheManager: CacheManager = sharedState.cacheManager
+  protected[sql] def listener: SQLListener = sharedState.listener
+  protected[sql] def externalCatalog: ExternalCatalog = sharedState.externalCatalog
+
   /**
-   * Returns a SQLContext as new session, with separated SQL configurations, temporary tables,
-   * registered functions, but sharing the same SparkContext, CacheManager, SQLListener and SQLTab.
+   * Returns a [[SQLContext]] as new session, with separated SQL configurations, temporary
+   * tables, registered functions, but sharing the same [[SparkContext]], cached data and
+   * other things.
    *
    * @since 1.6.0
    */
-  def newSession(): SQLContext = {
-    new SQLContext(
-      sparkContext = sparkContext,
-      cacheManager = cacheManager,
-      listener = listener,
-      isRootContext = false,
-      externalCatalog = externalCatalog)
-  }
+  def newSession(): SQLContext = new SQLContext(sharedState, isRootContext = false)
 
   /**
    * Per-session state, e.g. configuration, functions, temporary tables etc.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index c30f879dedaa17b6c5dce66545b2992d102d21b5..d404a7c0aef595b54de07cff5624a6b7f6cc8029 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -22,10 +22,8 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, PreInsertCastAndRename, ResolveDataSource}
-import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
 import org.apache.spark.sql.util.ExecutionListenerManager
 
 /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
new file mode 100644
index 0000000000000000000000000000000000000000..9a30c7de1f8f299066484dc0388b78547d6ef0f0
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
+import org.apache.spark.sql.execution.CacheManager
+import org.apache.spark.sql.execution.ui.SQLListener
+
+
+/**
+ * A class that holds all state shared across sessions in a given [[SQLContext]].
+ */
+private[sql] class SharedState(val sparkContext: SparkContext) {
+
+  /**
+   * Class for caching query results reused in future executions.
+   */
+  val cacheManager: CacheManager = new CacheManager
+
+  /**
+   * A listener for SQL-specific [[org.apache.spark.scheduler.SparkListenerEvent]]s.
+   */
+  val listener: SQLListener = SQLContext.createListenerAndUI(sparkContext)
+
+  /**
+   * A catalog that interacts with external systems.
+   */
+  lazy val externalCatalog: ExternalCatalog = new InMemoryCatalog
+
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 7ebdad1a68e4092c5efd5d8f2001835e6c8b1ecd..e36674311875a2265a4bb57f7434a5c702a7f356 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -49,12 +49,10 @@ import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression}
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand}
-import org.apache.spark.sql.execution.ui.SQLListener
 import org.apache.spark.sql.hive.client._
 import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{SharedState, SQLConf}
 import org.apache.spark.sql.internal.SQLConf._
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -67,32 +65,14 @@ import org.apache.spark.util.Utils
  * @since 1.0.0
  */
 class HiveContext private[hive](
-    sc: SparkContext,
-    cacheManager: CacheManager,
-    listener: SQLListener,
-    @transient private[hive] val executionHive: HiveClientImpl,
-    @transient private[hive] val metadataHive: HiveClient,
-    isRootContext: Boolean,
-    @transient private[sql] val hiveCatalog: HiveExternalCatalog)
-  extends SQLContext(sc, cacheManager, listener, isRootContext, hiveCatalog) with Logging {
-  self =>
+    @transient protected[hive] val hiveSharedState: HiveSharedState,
+    override val isRootContext: Boolean)
+  extends SQLContext(hiveSharedState, isRootContext) with Logging {
 
-  private def this(sc: SparkContext, execHive: HiveClientImpl, metaHive: HiveClient) {
-    this(
-      sc,
-      new CacheManager,
-      SQLContext.createListenerAndUI(sc),
-      execHive,
-      metaHive,
-      true,
-      new HiveExternalCatalog(metaHive))
-  }
+  self =>
 
   def this(sc: SparkContext) = {
-    this(
-      sc,
-      HiveContext.newClientForExecution(sc.conf, sc.hadoopConfiguration),
-      HiveContext.newClientForMetadata(sc.conf, sc.hadoopConfiguration))
+    this(new HiveSharedState(sc), true)
   }
 
   def this(sc: JavaSparkContext) = this(sc.sc)
@@ -107,19 +87,16 @@ class HiveContext private[hive](
    * and Hive client (both of execution and metadata) with existing HiveContext.
    */
   override def newSession(): HiveContext = {
-    new HiveContext(
-      sc = sc,
-      cacheManager = cacheManager,
-      listener = listener,
-      executionHive = executionHive.newSession(),
-      metadataHive = metadataHive.newSession(),
-      isRootContext = false,
-      hiveCatalog = hiveCatalog)
+    new HiveContext(hiveSharedState, isRootContext = false)
   }
 
   @transient
   protected[sql] override lazy val sessionState = new HiveSessionState(self)
 
+  protected[hive] def hiveCatalog: HiveExternalCatalog = hiveSharedState.externalCatalog
+  protected[hive] def executionHive: HiveClientImpl = sessionState.executionHive
+  protected[hive] def metadataHive: HiveClient = sessionState.metadataHive
+
   /**
    * When true, enables an experimental feature where metastore tables that use the parquet SerDe
    * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
@@ -163,7 +140,7 @@ class HiveContext private[hive](
   protected[hive] def hiveThriftServerAsync: Boolean = getConf(HIVE_THRIFT_SERVER_ASYNC)
 
   protected[hive] def hiveThriftServerSingleSession: Boolean =
-    sc.conf.get("spark.sql.hive.thriftServer.singleSession", "false").toBoolean
+    sparkContext.conf.getBoolean("spark.sql.hive.thriftServer.singleSession", defaultValue = false)
 
   @transient
   protected[sql] lazy val substitutor = new VariableSubstitution()
@@ -601,7 +578,9 @@ private[hive] object HiveContext extends Logging {
    * The version of the Hive client that is used here must match the metastore that is configured
    * in the hive-site.xml file.
    */
-  private def newClientForMetadata(conf: SparkConf, hadoopConf: Configuration): HiveClient = {
+  protected[hive] def newClientForMetadata(
+      conf: SparkConf,
+      hadoopConf: Configuration): HiveClient = {
     val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf])
     val configurations = hiveClientConfigurations(hiveConf)
     newClientForMetadata(conf, hiveConf, hadoopConf, configurations)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index b992fda18cef7e751d6fa9ad6f8c433fff437ccc..bc28b55d06d9e7a9f8deaf5bd7508966f7a02480 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -18,10 +18,11 @@
 package org.apache.spark.sql.hive
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.analysis.Analyzer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.execution.{python, SparkPlanner}
+import org.apache.spark.sql.execution.SparkPlanner
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl}
 import org.apache.spark.sql.hive.execution.HiveSqlParser
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
 
@@ -31,6 +32,16 @@ import org.apache.spark.sql.internal.{SessionState, SQLConf}
  */
 private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) {
 
+  /**
+   * A Hive client used for execution.
+   */
+  val executionHive: HiveClientImpl = ctx.hiveSharedState.executionHive.newSession()
+
+  /**
+   * A Hive client used for interacting with the metastore.
+   */
+  val metadataHive: HiveClient = ctx.hiveSharedState.metadataHive.newSession()
+
   override lazy val conf: SQLConf = new SQLConf {
     override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
   }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
new file mode 100644
index 0000000000000000000000000000000000000000..11097c33df2d5d6061c8bf2e71fba751993e41e6
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl}
+import org.apache.spark.sql.internal.SharedState
+
+
+/**
+ * A class that holds all state shared across sessions in a given [[HiveContext]].
+ */
+private[hive] class HiveSharedState(override val sparkContext: SparkContext)
+  extends SharedState(sparkContext) {
+
+  // TODO: just share the IsolatedClientLoader instead of the client instances themselves
+
+  /**
+   * A Hive client used for execution.
+   */
+  val executionHive: HiveClientImpl = {
+    HiveContext.newClientForExecution(sparkContext.conf, sparkContext.hadoopConfiguration)
+  }
+
+  /**
+   * A Hive client used to interact with the metastore.
+   */
+  // This needs to be a lazy val at here because TestHiveSharedState is overriding it.
+  lazy val metadataHive: HiveClient = {
+    HiveContext.newClientForMetadata(sparkContext.conf, sparkContext.hadoopConfiguration)
+  }
+
+  /**
+   * A catalog that interacts with the Hive metastore.
+   */
+  override lazy val externalCatalog = new HiveExternalCatalog(metadataHive)
+
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 7f6ca21782da46e37842e6c83a25b4ff7b341bd0..d56d36fe32e77b6b59ad3327b647fe9dc4d29dfa 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -72,63 +72,24 @@ object TestHive
  * test cases that rely on TestHive must be serialized.
  */
 class TestHiveContext private[hive](
-    sc: SparkContext,
-    cacheManager: CacheManager,
-    listener: SQLListener,
-    executionHive: HiveClientImpl,
-    metadataHive: HiveClient,
-    isRootContext: Boolean,
-    hiveCatalog: HiveExternalCatalog,
+    testHiveSharedState: TestHiveSharedState,
     val warehousePath: File,
     val scratchDirPath: File,
-    metastoreTemporaryConf: Map[String, String])
-  extends HiveContext(
-    sc,
-    cacheManager,
-    listener,
-    executionHive,
-    metadataHive,
-    isRootContext,
-    hiveCatalog) { self =>
-
-  // Unfortunately, due to the complex interactions between the construction parameters
-  // and the limitations in scala constructors, we need many of these constructors to
-  // provide a shorthand to create a new TestHiveContext with only a SparkContext.
-  // This is not a great design pattern but it's necessary here.
+    metastoreTemporaryConf: Map[String, String],
+    isRootContext: Boolean)
+  extends HiveContext(testHiveSharedState, isRootContext) { self =>
 
   private def this(
       sc: SparkContext,
-      executionHive: HiveClientImpl,
-      metadataHive: HiveClient,
       warehousePath: File,
       scratchDirPath: File,
       metastoreTemporaryConf: Map[String, String]) {
     this(
-      sc,
-      new CacheManager,
-      SQLContext.createListenerAndUI(sc),
-      executionHive,
-      metadataHive,
-      true,
-      new HiveExternalCatalog(metadataHive),
+      new TestHiveSharedState(sc, warehousePath, scratchDirPath, metastoreTemporaryConf),
       warehousePath,
       scratchDirPath,
-      metastoreTemporaryConf)
-  }
-
-  private def this(
-      sc: SparkContext,
-      warehousePath: File,
-      scratchDirPath: File,
-      metastoreTemporaryConf: Map[String, String]) {
-    this(
-      sc,
-      HiveContext.newClientForExecution(sc.conf, sc.hadoopConfiguration),
-      TestHiveContext.newClientForMetadata(
-        sc.conf, sc.hadoopConfiguration, warehousePath, scratchDirPath, metastoreTemporaryConf),
-      warehousePath,
-      scratchDirPath,
-      metastoreTemporaryConf)
+      metastoreTemporaryConf,
+      true)
   }
 
   def this(sc: SparkContext) {
@@ -141,16 +102,11 @@ class TestHiveContext private[hive](
 
   override def newSession(): HiveContext = {
     new TestHiveContext(
-      sc = sc,
-      cacheManager = cacheManager,
-      listener = listener,
-      executionHive = executionHive.newSession(),
-      metadataHive = metadataHive.newSession(),
-      isRootContext = false,
-      hiveCatalog = hiveCatalog,
-      warehousePath = warehousePath,
-      scratchDirPath = scratchDirPath,
-      metastoreTemporaryConf = metastoreTemporaryConf)
+      testHiveSharedState,
+      warehousePath,
+      scratchDirPath,
+      metastoreTemporaryConf,
+      isRootContext = false)
   }
 
   // By clearing the port we force Spark to pick a new one.  This allows us to rerun tests
@@ -549,6 +505,22 @@ private[hive] class TestHiveFunctionRegistry extends SimpleFunctionRegistry {
   }
 }
 
+
+private[hive] class TestHiveSharedState(
+    sc: SparkContext,
+    warehousePath: File,
+    scratchDirPath: File,
+    metastoreTemporaryConf: Map[String, String])
+  extends HiveSharedState(sc) {
+
+  override lazy val metadataHive: HiveClient = {
+    TestHiveContext.newClientForMetadata(
+      sc.conf, sc.hadoopConfiguration, warehousePath, scratchDirPath, metastoreTemporaryConf)
+  }
+
+}
+
+
 private[hive] object TestHiveContext {
 
   /**
@@ -563,7 +535,7 @@ private[hive] object TestHiveContext {
   /**
    * Create a [[HiveClient]] used to retrieve metadata from the Hive MetaStore.
    */
-  private def newClientForMetadata(
+  def newClientForMetadata(
       conf: SparkConf,
       hadoopConf: Configuration,
       warehousePath: File,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index 3334c16f0be8777bc230e413ec76d465f1474634..84285b7f40832cfbf6956de53e67f7c3276951aa 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -18,12 +18,10 @@
 package org.apache.spark.sql.hive
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.util.VersionInfo
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.hive.client.{HiveClient, IsolatedClientLoader}
-import org.apache.spark.util.Utils
+import org.apache.spark.sql.hive.client.HiveClient
 
 /**
  * Test suite for the [[HiveExternalCatalog]].
@@ -31,11 +29,9 @@ import org.apache.spark.util.Utils
 class HiveExternalCatalogSuite extends CatalogTestCases {
 
   private val client: HiveClient = {
-    IsolatedClientLoader.forVersion(
-      hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
-      hadoopVersion = VersionInfo.getVersion,
-      sparkConf = new SparkConf(),
-      hadoopConf = new Configuration()).createClient()
+    // We create a metastore at a temp location to avoid any potential
+    // conflict of having multiple connections to a single derby instance.
+    HiveContext.newClientForExecution(new SparkConf, new Configuration)
   }
 
   protected override val utils: CatalogTestUtils = new CatalogTestUtils {