Skip to content
Snippets Groups Projects
Commit 35e97407 authored by gatorsmile's avatar gatorsmile Committed by Wenchen Fan
Browse files

[SPARK-19028][SQL] Fixed non-thread-safe functions used in SessionCatalog

### What changes were proposed in this pull request?
Fixed non-thread-safe functions used in SessionCatalog:
- refreshTable
- lookupRelation

### How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #16437 from gatorsmile/addSyncToLookUpTable.
parent 871f6114
No related branches found
No related tags found
No related merge requests found
......@@ -634,7 +634,7 @@ class SessionCatalog(
/**
* Refresh the cache entry for a metastore table, if any.
*/
def refreshTable(name: TableIdentifier): Unit = {
def refreshTable(name: TableIdentifier): Unit = synchronized {
// Go through temporary tables and invalidate them.
// If the database is defined, this is definitely not a temp table.
// If the database is not defined, there is a good chance this is a temp table.
......
......@@ -56,23 +56,25 @@ private[sql] class HiveSessionCatalog(
hadoopConf) {
override def lookupRelation(name: TableIdentifier, alias: Option[String]): LogicalPlan = {
val table = formatTableName(name.table)
val db = formatDatabaseName(name.database.getOrElse(currentDb))
if (db == globalTempViewManager.database) {
val relationAlias = alias.getOrElse(table)
globalTempViewManager.get(table).map { viewDef =>
SubqueryAlias(relationAlias, viewDef, Some(name))
}.getOrElse(throw new NoSuchTableException(db, table))
} else if (name.database.isDefined || !tempTables.contains(table)) {
val database = name.database.map(formatDatabaseName)
val newName = name.copy(database = database, table = table)
metastoreCatalog.lookupRelation(newName, alias)
} else {
val relation = tempTables(table)
val tableWithQualifiers = SubqueryAlias(table, relation, None)
// If an alias was specified by the lookup, wrap the plan in a subquery so that
// attributes are properly qualified with this alias.
alias.map(a => SubqueryAlias(a, tableWithQualifiers, None)).getOrElse(tableWithQualifiers)
synchronized {
val table = formatTableName(name.table)
val db = formatDatabaseName(name.database.getOrElse(currentDb))
if (db == globalTempViewManager.database) {
val relationAlias = alias.getOrElse(table)
globalTempViewManager.get(table).map { viewDef =>
SubqueryAlias(relationAlias, viewDef, Some(name))
}.getOrElse(throw new NoSuchTableException(db, table))
} else if (name.database.isDefined || !tempTables.contains(table)) {
val database = name.database.map(formatDatabaseName)
val newName = name.copy(database = database, table = table)
metastoreCatalog.lookupRelation(newName, alias)
} else {
val relation = tempTables(table)
val tableWithQualifiers = SubqueryAlias(table, relation, None)
// If an alias was specified by the lookup, wrap the plan in a subquery so that
// attributes are properly qualified with this alias.
alias.map(a => SubqueryAlias(a, tableWithQualifiers, None)).getOrElse(tableWithQualifiers)
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment