diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 46991fbd68cdea70472a8ab601e1eaceaca348ef..7c6a7df2bd01ecd7b92c27d4ff7b621138ced9b5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -181,7 +181,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
           val tableFullName =
             relation.hiveQlTable.getDbName + "." + relation.hiveQlTable.getTableName
 
-          catalog.client.alterTable(tableFullName, new Table(hiveTTable))
+          catalog.synchronized {
+            catalog.client.alterTable(tableFullName, new Table(hiveTTable))
+          }
         }
       case otherRelation =>
         throw new UnsupportedOperationException(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 14cdb420731cd8889b7bdcff996ff082f5c1bd70..bbd920a4051de24f230bd1e81a2d15e541391cd1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -67,7 +67,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
     val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
       override def load(in: QualifiedTableName): LogicalPlan = {
         logDebug(s"Creating new cached data source for $in")
-        val table = synchronized {
+        val table = HiveMetastoreCatalog.this.synchronized {
           client.getTable(in.database, in.name)
         }
 
@@ -183,12 +183,16 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
 
   def lookupRelation(
       tableIdentifier: Seq[String],
-      alias: Option[String]): LogicalPlan = synchronized {
+      alias: Option[String]): LogicalPlan = {
     val tableIdent = processTableIdentifier(tableIdentifier)
     val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
       hive.sessionState.getCurrentDatabase)
     val tblName = tableIdent.last
-    val table = try client.getTable(databaseName, tblName) catch {
+    val table = try {
+      synchronized {
+        client.getTable(databaseName, tblName)
+      }
+    } catch {
       case te: org.apache.hadoop.hive.ql.metadata.InvalidTableException =>
         throw new NoSuchTableException
     }
@@ -210,7 +214,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
     } else {
       val partitions: Seq[Partition] =
         if (table.isPartitioned) {
-          HiveShim.getAllPartitionsOf(client, table).toSeq
+          synchronized {
+            HiveShim.getAllPartitionsOf(client, table).toSeq
+          }
         } else {
           Nil
         }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index cdf012b5117be3950c9378598b271bbd3e4e0e9b..6c9674743968395c0993a53dcca2fc179aea64e1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -50,7 +50,7 @@ case class InsertIntoHiveTable(
   @transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
   @transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
   @transient private lazy val hiveContext = new Context(sc.hiveconf)
-  @transient private lazy val db = Hive.get(sc.hiveconf)
+  @transient private lazy val catalog = sc.catalog
 
   private def newSerializer(tableDesc: TableDesc): Serializer = {
     val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
@@ -199,38 +199,45 @@ case class InsertIntoHiveTable(
           orderedPartitionSpec.put(entry.getName,partitionSpec.get(entry.getName).getOrElse(""))
       }
       val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec)
-      db.validatePartitionNameCharacters(partVals)
+      catalog.synchronized {
+        catalog.client.validatePartitionNameCharacters(partVals)
+      }
       // inheritTableSpecs is set to true. It should be set to false for a IMPORT query
       // which is currently considered as a Hive native command.
       val inheritTableSpecs = true
       // TODO: Correctly set isSkewedStoreAsSubdir.
       val isSkewedStoreAsSubdir = false
       if (numDynamicPartitions > 0) {
-        db.loadDynamicPartitions(
-          outputPath,
-          qualifiedTableName,
-          orderedPartitionSpec,
-          overwrite,
-          numDynamicPartitions,
-          holdDDLTime,
-          isSkewedStoreAsSubdir
-        )
+        catalog.synchronized {
+          catalog.client.loadDynamicPartitions(
+            outputPath,
+            qualifiedTableName,
+            orderedPartitionSpec,
+            overwrite,
+            numDynamicPartitions,
+            holdDDLTime,
+            isSkewedStoreAsSubdir)
+        }
       } else {
-        db.loadPartition(
+        catalog.synchronized {
+          catalog.client.loadPartition(
+            outputPath,
+            qualifiedTableName,
+            orderedPartitionSpec,
+            overwrite,
+            holdDDLTime,
+            inheritTableSpecs,
+            isSkewedStoreAsSubdir)
+        }
+      }
+    } else {
+      catalog.synchronized {
+        catalog.client.loadTable(
           outputPath,
           qualifiedTableName,
-          orderedPartitionSpec,
           overwrite,
-          holdDDLTime,
-          inheritTableSpecs,
-          isSkewedStoreAsSubdir)
+          holdDDLTime)
       }
-    } else {
-      db.loadTable(
-        outputPath,
-        qualifiedTableName,
-        overwrite,
-        holdDDLTime)
     }
 
     // Invalidate the cache.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 310c2bfdf1011b7b265b165153ab5f61d15a813b..2065f0d60d92ffa2f5782da5c1611017c712642a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -457,4 +457,15 @@ class SQLQuerySuite extends QueryTest {
     dropTempTable("data")
     setConf("spark.sql.hive.convertCTAS", originalConf)
   }
+
+  test("sanity test for SPARK-6618") {
+    (1 to 100).par.map { i =>
+      val tableName = s"SPARK_6618_table_$i"
+      sql(s"CREATE TABLE $tableName (col1 string)")
+      catalog.lookupRelation(Seq(tableName))
+      table(tableName)
+      tables()
+      sql(s"DROP TABLE $tableName")
+    }
+  }
 }