diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index f7ad2efc9544e3d063739ef6853e40c6d7011fd8..2cc8d65d3cb79e5a1a03ddae5a3f3e6548cf8ef3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -52,6 +52,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with /** Connection to hive metastore. Usages should lock on `this`. */ protected[hive] val client = Hive.get(hive.hiveconf) + /** Usages should lock on `this`. */ protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf) // TODO: Use this everywhere instead of tuples or databaseName, tableName,. @@ -65,7 +66,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() { override def load(in: QualifiedTableName): LogicalPlan = { logDebug(s"Creating new cached data source for $in") - val table = client.getTable(in.database, in.name) + val table = synchronized { + client.getTable(in.database, in.name) + } val schemaString = table.getProperty("spark.sql.sources.schema") val userSpecifiedSchema = if (schemaString == null) { @@ -134,15 +137,18 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } } - def hiveDefaultTableFilePath(tableName: String): String = { + def hiveDefaultTableFilePath(tableName: String): String = synchronized { val currentDatabase = client.getDatabase(hive.sessionState.getCurrentDatabase) + hiveWarehouse.getTablePath(currentDatabase, tableName).toString } - def tableExists(tableIdentifier: Seq[String]): Boolean = { + def tableExists(tableIdentifier: Seq[String]): Boolean = synchronized { val tableIdent = processTableIdentifier(tableIdentifier) - val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse( - hive.sessionState.getCurrentDatabase) + val databaseName = + tableIdent + .lift(tableIdent.size - 2) + .getOrElse(hive.sessionState.getCurrentDatabase) val tblName = tableIdent.last client.getTable(databaseName, tblName, false) != null } @@ -219,7 +225,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } } - override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { + override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = synchronized { val dbName = if (!caseSensitive) { if (databaseName.isDefined) Some(databaseName.get.toLowerCase) else None } else {