Skip to content
Snippets Groups Projects
Commit e1585cc7 authored by Yin Huai's avatar Yin Huai Committed by Reynold Xin
Browse files

[SPARK-15959][SQL] Add the support of hive.metastore.warehouse.dir back

## What changes were proposed in this pull request?
This PR adds the support of conf `hive.metastore.warehouse.dir` back. With this patch, the way of setting the warehouse dir is described as follows:
* If `spark.sql.warehouse.dir` is set, `hive.metastore.warehouse.dir` will be automatically set to the value of `spark.sql.warehouse.dir`. The warehouse dir is effectively set to the value of `spark.sql.warehouse.dir`.
* If `spark.sql.warehouse.dir` is not set but `hive.metastore.warehouse.dir` is set, `spark.sql.warehouse.dir` will be automatically set to the value of `hive.metastore.warehouse.dir`. The warehouse dir is effectively set to the value of `hive.metastore.warehouse.dir`.
* If neither `spark.sql.warehouse.dir` nor `hive.metastore.warehouse.dir` is set, `hive.metastore.warehouse.dir` will be automatically set to the default value of `spark.sql.warehouse.dir`. The warehouse dir is effectively set to the default value of `spark.sql.warehouse.dir`.

## How was this patch tested?
`set hive.metastore.warehouse.dir` in `HiveSparkSubmitSuite`.

JIRA: https://issues.apache.org/jira/browse/SPARK-15959

Author: Yin Huai <yhuai@databricks.com>

Closes #13679 from yhuai/hiveWarehouseDir.
parent 9a507199
No related branches found
No related tags found
No related merge requests found
......@@ -20,6 +20,7 @@ package org.apache.spark.sql.internal
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
import org.apache.spark.sql.execution.CacheManager
......@@ -30,7 +31,7 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
/**
* A class that holds all state shared across sessions in a given [[SQLContext]].
*/
private[sql] class SharedState(val sparkContext: SparkContext) {
private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
/**
* Class for caching query results reused in future executions.
......@@ -46,7 +47,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) {
* The base hadoop configuration which is shared among all spark sessions. It is based on the
* default hadoop configuration of Spark, with custom configurations inside `hive-site.xml`.
*/
lazy val hadoopConf: Configuration = {
val hadoopConf: Configuration = {
val conf = new Configuration(sparkContext.hadoopConfiguration)
val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
if (configFile != null) {
......@@ -66,6 +67,30 @@ private[sql] class SharedState(val sparkContext: SparkContext) {
val jarClassLoader = new NonClosableMutableURLClassLoader(
org.apache.spark.util.Utils.getContextOrSparkClassLoader)
{
// Set the Hive metastore warehouse path to the one we use
val tempConf = new SQLConf
sparkContext.conf.getAll.foreach { case (k, v) => tempConf.setConfString(k, v) }
val hiveWarehouseDir = hadoopConf.get("hive.metastore.warehouse.dir")
if (hiveWarehouseDir != null && !tempConf.contains(SQLConf.WAREHOUSE_PATH.key)) {
// If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set,
// we will respect the value of hive.metastore.warehouse.dir.
tempConf.setConfString(SQLConf.WAREHOUSE_PATH.key, hiveWarehouseDir)
sparkContext.conf.set(SQLConf.WAREHOUSE_PATH.key, hiveWarehouseDir)
logInfo(s"${SQLConf.WAREHOUSE_PATH.key} is not set, but hive.metastore.warehouse.dir " +
s"is set. Setting ${SQLConf.WAREHOUSE_PATH.key} to the value of " +
s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').")
} else {
// If spark.sql.warehouse.dir is set, we will override hive.metastore.warehouse.dir using
// the value of spark.sql.warehouse.dir.
// When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set,
// we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir.
sparkContext.conf.set("hive.metastore.warehouse.dir", tempConf.warehousePath)
}
logInfo(s"Warehouse path is '${tempConf.warehousePath}'.")
}
/**
* Create a SQLListener then add it into SparkContext, and create a SQLTab if there is SparkUI.
*/
......
......@@ -18,9 +18,8 @@
package org.apache.spark.sql.hive
import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.{SharedState, SQLConf}
import org.apache.spark.sql.internal.SharedState
/**
......@@ -28,18 +27,10 @@ import org.apache.spark.sql.internal.{SharedState, SQLConf}
* [[org.apache.spark.sql.SparkSession]] backed by Hive.
*/
private[hive] class HiveSharedState(override val sparkContext: SparkContext)
extends SharedState(sparkContext) with Logging {
extends SharedState(sparkContext) {
// TODO: just share the IsolatedClientLoader instead of the client instance itself
{
// Set the Hive metastore warehouse path to the one we use
val tempConf = new SQLConf
sparkContext.conf.getAll.foreach { case (k, v) => tempConf.setConfString(k, v) }
sparkContext.conf.set("hive.metastore.warehouse.dir", tempConf.warehousePath)
logInfo(s"Setting Hive metastore warehouse path to '${tempConf.warehousePath}'")
}
/**
* A Hive client used to interact with the metastore.
*/
......
......@@ -17,7 +17,7 @@
package org.apache.spark.sql.hive
import java.io.File
import java.io.{BufferedWriter, File, FileWriter}
import java.sql.Timestamp
import java.util.Date
......@@ -205,7 +205,7 @@ class HiveSparkSubmitSuite
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val args = Seq(
"--class", SetWarehouseLocationTest.getClass.getName.stripSuffix("$"),
"--name", "SetWarehouseLocationTest",
"--name", "SetSparkWarehouseLocationTest",
"--master", "local-cluster[2,1,1024]",
"--conf", "spark.ui.enabled=false",
"--conf", "spark.master.rest.enabled=false",
......@@ -214,6 +214,45 @@ class HiveSparkSubmitSuite
runSparkSubmit(args)
}
test("set hive.metastore.warehouse.dir") {
// In this test, we set hive.metastore.warehouse.dir in hive-site.xml but
// not set spark.sql.warehouse.dir. So, the warehouse dir should be
// the value of hive.metastore.warehouse.dir. Also, the value of
// spark.sql.warehouse.dir should be set to the value of hive.metastore.warehouse.dir.
val hiveWarehouseLocation = Utils.createTempDir()
hiveWarehouseLocation.delete()
val hiveSiteXmlContent =
s"""
|<configuration>
| <property>
| <name>hive.metastore.warehouse.dir</name>
| <value>$hiveWarehouseLocation</value>
| </property>
|</configuration>
""".stripMargin
// Write a hive-site.xml containing a setting of hive.metastore.warehouse.dir.
val hiveSiteDir = Utils.createTempDir()
val file = new File(hiveSiteDir.getCanonicalPath, "hive-site.xml")
val bw = new BufferedWriter(new FileWriter(file))
bw.write(hiveSiteXmlContent)
bw.close()
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val args = Seq(
"--class", SetWarehouseLocationTest.getClass.getName.stripSuffix("$"),
"--name", "SetHiveWarehouseLocationTest",
"--master", "local-cluster[2,1,1024]",
"--conf", "spark.ui.enabled=false",
"--conf", "spark.master.rest.enabled=false",
"--conf", s"spark.sql.test.expectedWarehouseDir=$hiveWarehouseLocation",
"--conf", s"spark.driver.extraClassPath=${hiveSiteDir.getCanonicalPath}",
"--driver-java-options", "-Dderby.system.durability=test",
unusedJar.toString)
runSparkSubmit(args)
}
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
// This is copied from org.apache.spark.deploy.SparkSubmitSuite
private def runSparkSubmit(args: Seq[String]): Unit = {
......@@ -277,19 +316,43 @@ class HiveSparkSubmitSuite
object SetWarehouseLocationTest extends Logging {
def main(args: Array[String]): Unit = {
Utils.configTestLog4j("INFO")
val warehouseLocation = Utils.createTempDir()
warehouseLocation.delete()
val hiveWarehouseLocation = Utils.createTempDir()
hiveWarehouseLocation.delete()
// We will use the value of spark.sql.warehouse.dir override the
// value of hive.metastore.warehouse.dir.
val sparkSession = SparkSession.builder()
val sparkConf = new SparkConf(loadDefaults = true)
val builder = SparkSession.builder()
.config(sparkConf)
.config("spark.ui.enabled", "false")
.config("spark.sql.warehouse.dir", warehouseLocation.toString)
.config("hive.metastore.warehouse.dir", hiveWarehouseLocation.toString)
.enableHiveSupport()
.getOrCreate()
val providedExpectedWarehouseLocation =
sparkConf.getOption("spark.sql.test.expectedWarehouseDir")
val (sparkSession, expectedWarehouseLocation) = providedExpectedWarehouseLocation match {
case Some(warehouseDir) =>
// If spark.sql.test.expectedWarehouseDir is set, the warehouse dir is set
// through spark-summit. So, neither spark.sql.warehouse.dir nor
// hive.metastore.warehouse.dir is set at here.
(builder.getOrCreate(), warehouseDir)
case None =>
val warehouseLocation = Utils.createTempDir()
warehouseLocation.delete()
val hiveWarehouseLocation = Utils.createTempDir()
hiveWarehouseLocation.delete()
// If spark.sql.test.expectedWarehouseDir is not set, we will set
// spark.sql.warehouse.dir and hive.metastore.warehouse.dir.
// We are expecting that the value of spark.sql.warehouse.dir will override the
// value of hive.metastore.warehouse.dir.
val session = builder
.config("spark.sql.warehouse.dir", warehouseLocation.toString)
.config("hive.metastore.warehouse.dir", hiveWarehouseLocation.toString)
.getOrCreate()
(session, warehouseLocation.toString)
}
if (sparkSession.conf.get("spark.sql.warehouse.dir") != expectedWarehouseLocation) {
throw new Exception(
"spark.sql.warehouse.dir is not set to the expected warehouse location " +
s"$expectedWarehouseLocation.")
}
val catalog = sparkSession.sessionState.catalog
......@@ -301,7 +364,7 @@ object SetWarehouseLocationTest extends Logging {
val tableMetadata =
catalog.getTableMetadata(TableIdentifier("testLocation", Some("default")))
val expectedLocation =
"file:" + warehouseLocation.toString + "/testlocation"
"file:" + expectedWarehouseLocation.toString + "/testlocation"
val actualLocation = tableMetadata.storage.locationUri.get
if (actualLocation != expectedLocation) {
throw new Exception(
......@@ -317,7 +380,7 @@ object SetWarehouseLocationTest extends Logging {
val tableMetadata =
catalog.getTableMetadata(TableIdentifier("testLocation", Some("testLocationDB")))
val expectedLocation =
"file:" + warehouseLocation.toString + "/testlocationdb.db/testlocation"
"file:" + expectedWarehouseLocation.toString + "/testlocationdb.db/testlocation"
val actualLocation = tableMetadata.storage.locationUri.get
if (actualLocation != expectedLocation) {
throw new Exception(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment