From 7de06a646dff7ede520d2e982ac0996d8c184650 Mon Sep 17 00:00:00 2001
From: Andrew Or <andrew@databricks.com>
Date: Sun, 17 Apr 2016 17:35:41 -0700
Subject: [PATCH] Revert "[SPARK-14647][SQL] Group SQLContext/HiveContext state
 into SharedState"

This reverts commit 5cefecc95a5b8418713516802c416cfde5a94a2d.
---
 .../org/apache/spark/sql/SQLContext.scala     | 31 ++++---
 .../spark/sql/internal/SessionState.scala     |  2 +
 .../spark/sql/internal/SharedState.scala      | 47 ----------
 .../apache/spark/sql/hive/HiveContext.scala   | 51 +++++++----
 .../spark/sql/hive/HiveSessionState.scala     | 15 +---
 .../spark/sql/hive/HiveSharedState.scala      | 53 ------------
 .../apache/spark/sql/hive/test/TestHive.scala | 86 ++++++++++++-------
 .../sql/hive/HiveExternalCatalogSuite.scala   | 12 ++-
 8 files changed, 122 insertions(+), 175 deletions(-)
 delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
 delete mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 781d699819..9259ff4062 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.ShowTablesCommand
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
-import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
+import org.apache.spark.sql.internal.{SessionState, SQLConf}
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.ExecutionListenerManager
@@ -63,14 +63,17 @@ import org.apache.spark.util.Utils
  * @since 1.0.0
  */
 class SQLContext private[sql](
-    @transient protected[sql] val sharedState: SharedState,
-    val isRootContext: Boolean)
+    @transient val sparkContext: SparkContext,
+    @transient protected[sql] val cacheManager: CacheManager,
+    @transient private[sql] val listener: SQLListener,
+    val isRootContext: Boolean,
+    @transient private[sql] val externalCatalog: ExternalCatalog)
   extends Logging with Serializable {
 
   self =>
 
   def this(sc: SparkContext) = {
-    this(new SharedState(sc), true)
+    this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), true, new InMemoryCatalog)
   }
 
   def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
@@ -97,20 +100,20 @@ class SQLContext private[sql](
     }
   }
 
-  def sparkContext: SparkContext = sharedState.sparkContext
-
-  protected[sql] def cacheManager: CacheManager = sharedState.cacheManager
-  protected[sql] def listener: SQLListener = sharedState.listener
-  protected[sql] def externalCatalog: ExternalCatalog = sharedState.externalCatalog
-
   /**
-   * Returns a [[SQLContext]] as new session, with separated SQL configurations, temporary
-   * tables, registered functions, but sharing the same [[SparkContext]], cached data and
-   * other things.
+   * Returns a SQLContext as new session, with separated SQL configurations, temporary tables,
+   * registered functions, but sharing the same SparkContext, CacheManager, SQLListener and SQLTab.
    *
    * @since 1.6.0
    */
-  def newSession(): SQLContext = new SQLContext(sharedState, isRootContext = false)
+  def newSession(): SQLContext = {
+    new SQLContext(
+      sparkContext = sparkContext,
+      cacheManager = cacheManager,
+      listener = listener,
+      isRootContext = false,
+      externalCatalog = externalCatalog)
+  }
 
   /**
    * Per-session state, e.g. configuration, functions, temporary tables etc.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index d404a7c0ae..c30f879ded 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -22,8 +22,10 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, PreInsertCastAndRename, ResolveDataSource}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
 import org.apache.spark.sql.util.ExecutionListenerManager
 
 /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
deleted file mode 100644
index 9a30c7de1f..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.internal
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
-import org.apache.spark.sql.execution.CacheManager
-import org.apache.spark.sql.execution.ui.SQLListener
-
-
-/**
- * A class that holds all state shared across sessions in a given [[SQLContext]].
- */
-private[sql] class SharedState(val sparkContext: SparkContext) {
-
-  /**
-   * Class for caching query results reused in future executions.
-   */
-  val cacheManager: CacheManager = new CacheManager
-
-  /**
-   * A listener for SQL-specific [[org.apache.spark.scheduler.SparkListenerEvent]]s.
-   */
-  val listener: SQLListener = SQLContext.createListenerAndUI(sparkContext)
-
-  /**
-   * A catalog that interacts with external systems.
-   */
-  lazy val externalCatalog: ExternalCatalog = new InMemoryCatalog
-
-}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 71ef99a6a9..42cda0be16 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -45,10 +45,12 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression}
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand}
+import org.apache.spark.sql.execution.ui.SQLListener
 import org.apache.spark.sql.hive.client._
 import org.apache.spark.sql.hive.execution.{AnalyzeTable, DescribeHiveTableCommand, HiveNativeCommand}
-import org.apache.spark.sql.internal.{SharedState, SQLConf}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf._
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -61,14 +63,32 @@ import org.apache.spark.util.Utils
  * @since 1.0.0
  */
 class HiveContext private[hive](
-    @transient protected[hive] val hiveSharedState: HiveSharedState,
-    override val isRootContext: Boolean)
-  extends SQLContext(hiveSharedState, isRootContext) with Logging {
-
+    sc: SparkContext,
+    cacheManager: CacheManager,
+    listener: SQLListener,
+    @transient private[hive] val executionHive: HiveClientImpl,
+    @transient private[hive] val metadataHive: HiveClient,
+    isRootContext: Boolean,
+    @transient private[sql] val hiveCatalog: HiveExternalCatalog)
+  extends SQLContext(sc, cacheManager, listener, isRootContext, hiveCatalog) with Logging {
   self =>
 
+  private def this(sc: SparkContext, execHive: HiveClientImpl, metaHive: HiveClient) {
+    this(
+      sc,
+      new CacheManager,
+      SQLContext.createListenerAndUI(sc),
+      execHive,
+      metaHive,
+      true,
+      new HiveExternalCatalog(metaHive))
+  }
+
   def this(sc: SparkContext) = {
-    this(new HiveSharedState(sc), true)
+    this(
+      sc,
+      HiveContext.newClientForExecution(sc.conf, sc.hadoopConfiguration),
+      HiveContext.newClientForMetadata(sc.conf, sc.hadoopConfiguration))
   }
 
   def this(sc: JavaSparkContext) = this(sc.sc)
@@ -83,16 +103,19 @@ class HiveContext private[hive](
    * and Hive client (both of execution and metadata) with existing HiveContext.
    */
   override def newSession(): HiveContext = {
-    new HiveContext(hiveSharedState, isRootContext = false)
+    new HiveContext(
+      sc = sc,
+      cacheManager = cacheManager,
+      listener = listener,
+      executionHive = executionHive.newSession(),
+      metadataHive = metadataHive.newSession(),
+      isRootContext = false,
+      hiveCatalog = hiveCatalog)
   }
 
   @transient
   protected[sql] override lazy val sessionState = new HiveSessionState(self)
 
-  protected[hive] def hiveCatalog: HiveExternalCatalog = hiveSharedState.externalCatalog
-  protected[hive] def executionHive: HiveClientImpl = sessionState.executionHive
-  protected[hive] def metadataHive: HiveClient = sessionState.metadataHive
-
   /**
    * When true, enables an experimental feature where metastore tables that use the parquet SerDe
    * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
@@ -136,7 +159,7 @@ class HiveContext private[hive](
   protected[hive] def hiveThriftServerAsync: Boolean = getConf(HIVE_THRIFT_SERVER_ASYNC)
 
   protected[hive] def hiveThriftServerSingleSession: Boolean =
-    sparkContext.conf.getBoolean("spark.sql.hive.thriftServer.singleSession", defaultValue = false)
+    sc.conf.get("spark.sql.hive.thriftServer.singleSession", "false").toBoolean
 
   @transient
   protected[sql] lazy val substitutor = new VariableSubstitution()
@@ -504,9 +527,7 @@ private[hive] object HiveContext extends Logging {
    * The version of the Hive client that is used here must match the metastore that is configured
    * in the hive-site.xml file.
    */
-  protected[hive] def newClientForMetadata(
-      conf: SparkConf,
-      hadoopConf: Configuration): HiveClient = {
+  private def newClientForMetadata(conf: SparkConf, hadoopConf: Configuration): HiveClient = {
     val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf])
     val configurations = hiveClientConfigurations(hiveConf)
     newClientForMetadata(conf, hiveConf, hadoopConf, configurations)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index bc28b55d06..b992fda18c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -18,11 +18,10 @@
 package org.apache.spark.sql.hive
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.execution.SparkPlanner
+import org.apache.spark.sql.execution.{python, SparkPlanner}
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl}
 import org.apache.spark.sql.hive.execution.HiveSqlParser
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
 
@@ -32,16 +31,6 @@ import org.apache.spark.sql.internal.{SessionState, SQLConf}
  */
 private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) {
 
-  /**
-   * A Hive client used for execution.
-   */
-  val executionHive: HiveClientImpl = ctx.hiveSharedState.executionHive.newSession()
-
-  /**
-   * A Hive client used for interacting with the metastore.
-   */
-  val metadataHive: HiveClient = ctx.hiveSharedState.metadataHive.newSession()
-
   override lazy val conf: SQLConf = new SQLConf {
     override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
   }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
deleted file mode 100644
index 11097c33df..0000000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl}
-import org.apache.spark.sql.internal.SharedState
-
-
-/**
- * A class that holds all state shared across sessions in a given [[HiveContext]].
- */
-private[hive] class HiveSharedState(override val sparkContext: SparkContext)
-  extends SharedState(sparkContext) {
-
-  // TODO: just share the IsolatedClientLoader instead of the client instances themselves
-
-  /**
-   * A Hive client used for execution.
-   */
-  val executionHive: HiveClientImpl = {
-    HiveContext.newClientForExecution(sparkContext.conf, sparkContext.hadoopConfiguration)
-  }
-
-  /**
-   * A Hive client used to interact with the metastore.
-   */
-  // This needs to be a lazy val at here because TestHiveSharedState is overriding it.
-  lazy val metadataHive: HiveClient = {
-    HiveContext.newClientForMetadata(sparkContext.conf, sparkContext.hadoopConfiguration)
-  }
-
-  /**
-   * A catalog that interacts with the Hive metastore.
-   */
-  override lazy val externalCatalog = new HiveExternalCatalog(metadataHive)
-
-}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index d56d36fe32..7f6ca21782 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -72,24 +72,63 @@ object TestHive
  * test cases that rely on TestHive must be serialized.
  */
 class TestHiveContext private[hive](
-    testHiveSharedState: TestHiveSharedState,
+    sc: SparkContext,
+    cacheManager: CacheManager,
+    listener: SQLListener,
+    executionHive: HiveClientImpl,
+    metadataHive: HiveClient,
+    isRootContext: Boolean,
+    hiveCatalog: HiveExternalCatalog,
     val warehousePath: File,
     val scratchDirPath: File,
-    metastoreTemporaryConf: Map[String, String],
-    isRootContext: Boolean)
-  extends HiveContext(testHiveSharedState, isRootContext) { self =>
+    metastoreTemporaryConf: Map[String, String])
+  extends HiveContext(
+    sc,
+    cacheManager,
+    listener,
+    executionHive,
+    metadataHive,
+    isRootContext,
+    hiveCatalog) { self =>
+
+  // Unfortunately, due to the complex interactions between the construction parameters
+  // and the limitations in scala constructors, we need many of these constructors to
+  // provide a shorthand to create a new TestHiveContext with only a SparkContext.
+  // This is not a great design pattern but it's necessary here.
 
   private def this(
       sc: SparkContext,
+      executionHive: HiveClientImpl,
+      metadataHive: HiveClient,
       warehousePath: File,
       scratchDirPath: File,
       metastoreTemporaryConf: Map[String, String]) {
     this(
-      new TestHiveSharedState(sc, warehousePath, scratchDirPath, metastoreTemporaryConf),
+      sc,
+      new CacheManager,
+      SQLContext.createListenerAndUI(sc),
+      executionHive,
+      metadataHive,
+      true,
+      new HiveExternalCatalog(metadataHive),
       warehousePath,
       scratchDirPath,
-      metastoreTemporaryConf,
-      true)
+      metastoreTemporaryConf)
+  }
+
+  private def this(
+      sc: SparkContext,
+      warehousePath: File,
+      scratchDirPath: File,
+      metastoreTemporaryConf: Map[String, String]) {
+    this(
+      sc,
+      HiveContext.newClientForExecution(sc.conf, sc.hadoopConfiguration),
+      TestHiveContext.newClientForMetadata(
+        sc.conf, sc.hadoopConfiguration, warehousePath, scratchDirPath, metastoreTemporaryConf),
+      warehousePath,
+      scratchDirPath,
+      metastoreTemporaryConf)
   }
 
   def this(sc: SparkContext) {
@@ -102,11 +141,16 @@ class TestHiveContext private[hive](
 
   override def newSession(): HiveContext = {
     new TestHiveContext(
-      testHiveSharedState,
-      warehousePath,
-      scratchDirPath,
-      metastoreTemporaryConf,
-      isRootContext = false)
+      sc = sc,
+      cacheManager = cacheManager,
+      listener = listener,
+      executionHive = executionHive.newSession(),
+      metadataHive = metadataHive.newSession(),
+      isRootContext = false,
+      hiveCatalog = hiveCatalog,
+      warehousePath = warehousePath,
+      scratchDirPath = scratchDirPath,
+      metastoreTemporaryConf = metastoreTemporaryConf)
   }
 
   // By clearing the port we force Spark to pick a new one.  This allows us to rerun tests
@@ -505,22 +549,6 @@ private[hive] class TestHiveFunctionRegistry extends SimpleFunctionRegistry {
   }
 }
 
-
-private[hive] class TestHiveSharedState(
-    sc: SparkContext,
-    warehousePath: File,
-    scratchDirPath: File,
-    metastoreTemporaryConf: Map[String, String])
-  extends HiveSharedState(sc) {
-
-  override lazy val metadataHive: HiveClient = {
-    TestHiveContext.newClientForMetadata(
-      sc.conf, sc.hadoopConfiguration, warehousePath, scratchDirPath, metastoreTemporaryConf)
-  }
-
-}
-
-
 private[hive] object TestHiveContext {
 
   /**
@@ -535,7 +563,7 @@ private[hive] object TestHiveContext {
   /**
    * Create a [[HiveClient]] used to retrieve metadata from the Hive MetaStore.
    */
-  def newClientForMetadata(
+  private def newClientForMetadata(
       conf: SparkConf,
       hadoopConf: Configuration,
       warehousePath: File,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index 84285b7f40..3334c16f0b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -18,10 +18,12 @@
 package org.apache.spark.sql.hive
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.util.VersionInfo
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.hive.client.{HiveClient, IsolatedClientLoader}
+import org.apache.spark.util.Utils
 
 /**
  * Test suite for the [[HiveExternalCatalog]].
@@ -29,9 +31,11 @@ import org.apache.spark.sql.hive.client.HiveClient
 class HiveExternalCatalogSuite extends CatalogTestCases {
 
   private val client: HiveClient = {
-    // We create a metastore at a temp location to avoid any potential
-    // conflict of having multiple connections to a single derby instance.
-    HiveContext.newClientForExecution(new SparkConf, new Configuration)
+    IsolatedClientLoader.forVersion(
+      hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
+      hadoopVersion = VersionInfo.getVersion,
+      sparkConf = new SparkConf(),
+      hadoopConf = new Configuration()).createClient()
   }
 
   protected override val utils: CatalogTestUtils = new CatalogTestUtils {
-- 
GitLab