diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 7f892047c7075ab58aced8199d79cfb7b5756c86..fbacd59fd1028a8a965dfaf044c9783ecc3efd56 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -54,6 +54,7 @@ object TestHive .set("spark.sql.test", "") .set("spark.sql.hive.metastore.barrierPrefixes", "org.apache.spark.sql.hive.execution.PairSerDe") + .set("spark.sql.warehouse.dir", TestHiveContext.makeWarehouseDir().toURI.getPath) // SPARK-8910 .set("spark.ui.enabled", "false"))) @@ -111,7 +112,6 @@ class TestHiveContext( * A [[SparkSession]] used in [[TestHiveContext]]. * * @param sc SparkContext - * @param warehousePath path to the Hive warehouse directory * @param scratchDirPath scratch directory used by Hive's metastore client * @param metastoreTemporaryConf configuration options for Hive's metastore * @param existingSharedState optional [[TestHiveSharedState]] @@ -120,23 +120,15 @@ class TestHiveContext( */ private[hive] class TestHiveSparkSession( @transient private val sc: SparkContext, - val warehousePath: File, scratchDirPath: File, metastoreTemporaryConf: Map[String, String], @transient private val existingSharedState: Option[TestHiveSharedState], private val loadTestTables: Boolean) extends SparkSession(sc) with Logging { self => - // TODO: We need to set the temp warehouse path to sc's conf. - // Right now, In SparkSession, we will set the warehouse path to the default one - // instead of the temp one. Then, we override the setting in TestHiveSharedState - // when we creating metadataHive. This flow is not easy to follow and can introduce - // confusion when a developer is debugging an issue. We need to refactor this part - // to just set the temp warehouse path in sc's conf. def this(sc: SparkContext, loadTestTables: Boolean) { this( sc, - Utils.createTempDir(namePrefix = "warehouse"), TestHiveContext.makeScratchDir(), HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false), None, @@ -151,16 +143,16 @@ private[hive] class TestHiveSparkSession( @transient override lazy val sharedState: TestHiveSharedState = { existingSharedState.getOrElse( - new TestHiveSharedState(sc, warehousePath, scratchDirPath, metastoreTemporaryConf)) + new TestHiveSharedState(sc, scratchDirPath, metastoreTemporaryConf)) } @transient override lazy val sessionState: TestHiveSessionState = - new TestHiveSessionState(self, warehousePath) + new TestHiveSessionState(self) override def newSession(): TestHiveSparkSession = { new TestHiveSparkSession( - sc, warehousePath, scratchDirPath, metastoreTemporaryConf, Some(sharedState), loadTestTables) + sc, scratchDirPath, metastoreTemporaryConf, Some(sharedState), loadTestTables) } private var cacheTables: Boolean = false @@ -199,6 +191,12 @@ private[hive] class TestHiveSparkSession( new File(Thread.currentThread().getContextClassLoader.getResource(path).getFile) } + def getWarehousePath(): String = { + val tempConf = new SQLConf + sc.conf.getAll.foreach { case (k, v) => tempConf.setConfString(k, v) } + tempConf.warehousePath + } + val describedTable = "DESCRIBE (\\w+)".r case class TestTable(name: String, commands: (() => Unit)*) @@ -509,21 +507,19 @@ private[hive] class TestHiveFunctionRegistry extends SimpleFunctionRegistry { private[hive] class TestHiveSharedState( sc: SparkContext, - warehousePath: File, scratchDirPath: File, metastoreTemporaryConf: Map[String, String]) extends HiveSharedState(sc) { override lazy val metadataHive: HiveClient = { TestHiveContext.newClientForMetadata( - sc.conf, sc.hadoopConfiguration, warehousePath, scratchDirPath, metastoreTemporaryConf) + sc.conf, sc.hadoopConfiguration, scratchDirPath, metastoreTemporaryConf) } } private[hive] class TestHiveSessionState( - sparkSession: TestHiveSparkSession, - warehousePath: File) + sparkSession: TestHiveSparkSession) extends HiveSessionState(sparkSession) { self => override lazy val conf: SQLConf = { @@ -533,7 +529,6 @@ private[hive] class TestHiveSessionState( override def clear(): Unit = { super.clear() TestHiveContext.overrideConfs.foreach { case (k, v) => setConfString(k, v) } - setConfString("hive.metastore.warehouse.dir", self.warehousePath.toURI.toString) } } } @@ -571,13 +566,12 @@ private[hive] object TestHiveContext { def newClientForMetadata( conf: SparkConf, hadoopConf: Configuration, - warehousePath: File, scratchDirPath: File, metastoreTemporaryConf: Map[String, String]): HiveClient = { HiveUtils.newClientForMetadata( conf, hadoopConf, - hiveClientConfigurations(hadoopConf, warehousePath, scratchDirPath, metastoreTemporaryConf)) + hiveClientConfigurations(hadoopConf, scratchDirPath, metastoreTemporaryConf)) } /** @@ -585,18 +579,20 @@ private[hive] object TestHiveContext { */ def hiveClientConfigurations( hadoopConf: Configuration, - warehousePath: File, scratchDirPath: File, metastoreTemporaryConf: Map[String, String]): Map[String, String] = { HiveUtils.hiveClientConfigurations(hadoopConf) ++ metastoreTemporaryConf ++ Map( - // Override WAREHOUSE_PATH and METASTOREWAREHOUSE to use the given path. - SQLConf.WAREHOUSE_PATH.key -> warehousePath.toURI.toString, - ConfVars.METASTOREWAREHOUSE.varname -> warehousePath.toURI.toString, ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true", ConfVars.SCRATCHDIR.varname -> scratchDirPath.toURI.toString, ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY.varname -> "1") } + def makeWarehouseDir(): File = { + val warehouseDir = Utils.createTempDir(namePrefix = "warehouse") + warehouseDir.delete() + warehouseDir + } + def makeScratchDir(): File = { val scratchDir = Utils.createTempDir(namePrefix = "scratch") scratchDir.delete() diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 31283b9fd6ef21d880f44e1fcce39fa988502b76..6785167d3dfbae302fa73bba3bbb3a9e2cb519be 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -964,7 +964,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { .mkString("/") // Loads partition data to a temporary table to verify contents - val path = s"${sparkSession.warehousePath}/dynamic_part_table/$partFolder/part-00000" + val path = s"${sparkSession.getWarehousePath}/dynamic_part_table/$partFolder/part-00000" sql("DROP TABLE IF EXISTS dp_verify") sql("CREATE TABLE dp_verify(intcol INT)") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index e461490310910ca1efc0c8d6294ec68e9e29b6e9..8d161a3c46b338a492007af5ba8361e516e76965 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -353,7 +353,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet withTable("bucketed_table") { df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") val tableDir = new File(hiveContext - .sparkSession.warehousePath, "bucketed_table") + .sparkSession.getWarehousePath, "bucketed_table") Utils.deleteRecursively(tableDir) df1.write.parquet(tableDir.getAbsolutePath)