Skip to content
Snippets Groups Projects
Commit 778f3ca8 authored by navis.ryu's avatar navis.ryu Committed by Reynold Xin
Browse files

[SPARK-7792] [SQL] HiveContext registerTempTable not thread safe

Just replaced mutable.HashMap to ConcurrentHashMap

Author: navis.ryu <navis@apache.org>

Closes #6699 from navis/SPARK-7792 and squashes the following commits:

f03654a [navis.ryu] [SPARK-7792] [SQL] HiveContext registerTempTable not thread safe
parent 6e4fb0c9
No related branches found
No related tags found
No related merge requests found
...@@ -17,7 +17,11 @@ ...@@ -17,7 +17,11 @@
package org.apache.spark.sql.catalyst.analysis package org.apache.spark.sql.catalyst.analysis
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConversions._
import scala.collection.mutable import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.EmptyConf import org.apache.spark.sql.catalyst.EmptyConf
...@@ -81,18 +85,18 @@ trait Catalog { ...@@ -81,18 +85,18 @@ trait Catalog {
} }
class SimpleCatalog(val conf: CatalystConf) extends Catalog { class SimpleCatalog(val conf: CatalystConf) extends Catalog {
val tables = new mutable.HashMap[String, LogicalPlan]() val tables = new ConcurrentHashMap[String, LogicalPlan]
override def registerTable( override def registerTable(
tableIdentifier: Seq[String], tableIdentifier: Seq[String],
plan: LogicalPlan): Unit = { plan: LogicalPlan): Unit = {
val tableIdent = processTableIdentifier(tableIdentifier) val tableIdent = processTableIdentifier(tableIdentifier)
tables += ((getDbTableName(tableIdent), plan)) tables.put(getDbTableName(tableIdent), plan)
} }
override def unregisterTable(tableIdentifier: Seq[String]): Unit = { override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
val tableIdent = processTableIdentifier(tableIdentifier) val tableIdent = processTableIdentifier(tableIdentifier)
tables -= getDbTableName(tableIdent) tables.remove(getDbTableName(tableIdent))
} }
override def unregisterAllTables(): Unit = { override def unregisterAllTables(): Unit = {
...@@ -101,10 +105,7 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog { ...@@ -101,10 +105,7 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
override def tableExists(tableIdentifier: Seq[String]): Boolean = { override def tableExists(tableIdentifier: Seq[String]): Boolean = {
val tableIdent = processTableIdentifier(tableIdentifier) val tableIdent = processTableIdentifier(tableIdentifier)
tables.get(getDbTableName(tableIdent)) match { tables.containsKey(getDbTableName(tableIdent))
case Some(_) => true
case None => false
}
} }
override def lookupRelation( override def lookupRelation(
...@@ -112,7 +113,10 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog { ...@@ -112,7 +113,10 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
alias: Option[String] = None): LogicalPlan = { alias: Option[String] = None): LogicalPlan = {
val tableIdent = processTableIdentifier(tableIdentifier) val tableIdent = processTableIdentifier(tableIdentifier)
val tableFullName = getDbTableName(tableIdent) val tableFullName = getDbTableName(tableIdent)
val table = tables.getOrElse(tableFullName, sys.error(s"Table Not Found: $tableFullName")) val table = tables.get(tableFullName)
if (table == null) {
sys.error(s"Table Not Found: $tableFullName")
}
val tableWithQualifiers = Subquery(tableIdent.last, table) val tableWithQualifiers = Subquery(tableIdent.last, table)
// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
...@@ -121,9 +125,11 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog { ...@@ -121,9 +125,11 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
} }
override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
tables.map { val result = ArrayBuffer.empty[(String, Boolean)]
case (name, _) => (name, true) for (name <- tables.keySet()) {
}.toSeq result += ((name, true))
}
result
} }
override def refreshTable(databaseName: String, tableName: String): Unit = { override def refreshTable(databaseName: String, tableName: String): Unit = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment