Skip to content
Snippets Groups Projects
Commit a2b91379 authored by Michael Armbrust's avatar Michael Armbrust
Browse files

[SPARK-5952][SQL] Lock when using hive metastore client

Author: Michael Armbrust <michael@databricks.com>

Closes #4746 from marmbrus/hiveLock and squashes the following commits:

8b871cf [Michael Armbrust] [SPARK-5952][SQL] Lock when using hive metastore client
parent c5ba975e
No related branches found
No related tags found
No related merge requests found
...@@ -52,6 +52,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with ...@@ -52,6 +52,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
/** Connection to hive metastore. Usages should lock on `this`. */ /** Connection to hive metastore. Usages should lock on `this`. */
protected[hive] val client = Hive.get(hive.hiveconf) protected[hive] val client = Hive.get(hive.hiveconf)
/** Usages should lock on `this`. */
protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf) protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf)
// TODO: Use this everywhere instead of tuples or databaseName, tableName,. // TODO: Use this everywhere instead of tuples or databaseName, tableName,.
...@@ -65,7 +66,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with ...@@ -65,7 +66,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() { val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
override def load(in: QualifiedTableName): LogicalPlan = { override def load(in: QualifiedTableName): LogicalPlan = {
logDebug(s"Creating new cached data source for $in") logDebug(s"Creating new cached data source for $in")
val table = client.getTable(in.database, in.name) val table = synchronized {
client.getTable(in.database, in.name)
}
val schemaString = table.getProperty("spark.sql.sources.schema") val schemaString = table.getProperty("spark.sql.sources.schema")
val userSpecifiedSchema = val userSpecifiedSchema =
if (schemaString == null) { if (schemaString == null) {
...@@ -134,15 +137,18 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with ...@@ -134,15 +137,18 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
} }
} }
def hiveDefaultTableFilePath(tableName: String): String = { def hiveDefaultTableFilePath(tableName: String): String = synchronized {
val currentDatabase = client.getDatabase(hive.sessionState.getCurrentDatabase) val currentDatabase = client.getDatabase(hive.sessionState.getCurrentDatabase)
hiveWarehouse.getTablePath(currentDatabase, tableName).toString hiveWarehouse.getTablePath(currentDatabase, tableName).toString
} }
def tableExists(tableIdentifier: Seq[String]): Boolean = { def tableExists(tableIdentifier: Seq[String]): Boolean = synchronized {
val tableIdent = processTableIdentifier(tableIdentifier) val tableIdent = processTableIdentifier(tableIdentifier)
val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse( val databaseName =
hive.sessionState.getCurrentDatabase) tableIdent
.lift(tableIdent.size - 2)
.getOrElse(hive.sessionState.getCurrentDatabase)
val tblName = tableIdent.last val tblName = tableIdent.last
client.getTable(databaseName, tblName, false) != null client.getTable(databaseName, tblName, false) != null
} }
...@@ -219,7 +225,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with ...@@ -219,7 +225,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
} }
} }
override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = synchronized {
val dbName = if (!caseSensitive) { val dbName = if (!caseSensitive) {
if (databaseName.isDefined) Some(databaseName.get.toLowerCase) else None if (databaseName.isDefined) Some(databaseName.get.toLowerCase) else None
} else { } else {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment