From 778f3ca81f8d90faec0775509632fe68f1399dc4 Mon Sep 17 00:00:00 2001
From: "navis.ryu" <navis@apache.org>
Date: Tue, 9 Jun 2015 19:33:00 -0700
Subject: [PATCH] [SPARK-7792] [SQL] HiveContext registerTempTable not thread
 safe

Just replaced mutable.HashMap to ConcurrentHashMap

Author: navis.ryu <navis@apache.org>

Closes #6699 from navis/SPARK-7792 and squashes the following commits:

f03654a [navis.ryu] [SPARK-7792] [SQL] HiveContext registerTempTable not thread safe
---
 .../spark/sql/catalyst/analysis/Catalog.scala | 28 +++++++++++--------
 1 file changed, 17 insertions(+), 11 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 3e240fd55e..1541491608 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -17,7 +17,11 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConversions._
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.catalyst.CatalystConf
 import org.apache.spark.sql.catalyst.EmptyConf
@@ -81,18 +85,18 @@ trait Catalog {
 }
 
 class SimpleCatalog(val conf: CatalystConf) extends Catalog {
-  val tables = new mutable.HashMap[String, LogicalPlan]()
+  val tables = new ConcurrentHashMap[String, LogicalPlan]
 
   override def registerTable(
       tableIdentifier: Seq[String],
       plan: LogicalPlan): Unit = {
     val tableIdent = processTableIdentifier(tableIdentifier)
-    tables += ((getDbTableName(tableIdent), plan))
+    tables.put(getDbTableName(tableIdent), plan)
   }
 
   override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
     val tableIdent = processTableIdentifier(tableIdentifier)
-    tables -= getDbTableName(tableIdent)
+    tables.remove(getDbTableName(tableIdent))
   }
 
   override def unregisterAllTables(): Unit = {
@@ -101,10 +105,7 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
 
   override def tableExists(tableIdentifier: Seq[String]): Boolean = {
     val tableIdent = processTableIdentifier(tableIdentifier)
-    tables.get(getDbTableName(tableIdent)) match {
-      case Some(_) => true
-      case None => false
-    }
+    tables.containsKey(getDbTableName(tableIdent))
   }
 
   override def lookupRelation(
@@ -112,7 +113,10 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
       alias: Option[String] = None): LogicalPlan = {
     val tableIdent = processTableIdentifier(tableIdentifier)
     val tableFullName = getDbTableName(tableIdent)
-    val table = tables.getOrElse(tableFullName, sys.error(s"Table Not Found: $tableFullName"))
+    val table = tables.get(tableFullName)
+    if (table == null) {
+      sys.error(s"Table Not Found: $tableFullName")
+    }
     val tableWithQualifiers = Subquery(tableIdent.last, table)
 
     // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
@@ -121,9 +125,11 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
   }
 
   override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
-    tables.map {
-      case (name, _) => (name, true)
-    }.toSeq
+    val result = ArrayBuffer.empty[(String, Boolean)]
+    for (name <- tables.keySet()) {
+      result += ((name, true))
+    }
+    result
   }
 
   override def refreshTable(databaseName: String, tableName: String): Unit = {
-- 
GitLab