diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 5f5c8e2432d6cc5ed3f89b35a279758268f1ac97..09d1abfa8c7a2259c31061b6b1fa9921db7f31c4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -22,7 +22,8 @@ import java.net.URI
 import java.text.SimpleDateFormat
 import java.util.{Date, Locale, Random}
 
-import org.apache.hadoop.conf.Configuration
+import scala.util.control.NonFatal
+
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.common.FileUtils
 import org.apache.hadoop.hive.ql.exec.TaskRunner
@@ -85,7 +86,9 @@ case class InsertIntoHiveTable(
   def output: Seq[Attribute] = Seq.empty
 
   val hadoopConf = sessionState.newHadoopConf()
+  var createdTempDir: Option[Path] = None
   val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
+  val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
 
   private def executionId: String = {
     val rand: Random = new Random
@@ -93,7 +96,7 @@ case class InsertIntoHiveTable(
     "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
   }
 
-  private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = {
+  private def getStagingDir(inputPath: Path): Path = {
     val inputPathUri: URI = inputPath.toUri
     val inputPathName: String = inputPathUri.getPath
     val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
@@ -111,31 +114,79 @@ case class InsertIntoHiveTable(
       if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
         throw new IllegalStateException("Cannot create staging directory  '" + dir.toString + "'")
       }
+      createdTempDir = Some(dir)
       fs.deleteOnExit(dir)
     } catch {
       case e: IOException =>
         throw new RuntimeException(
           "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
-
     }
     return dir
   }
 
-  private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = {
-    getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf)
+  private def getExternalScratchDir(extURI: URI): Path = {
+    getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath))
+  }
+
+  def getExternalTmpPath(path: Path): Path = {
+    import org.apache.spark.sql.hive.client.hive._
+
+    val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version
+    // Before Hive 1.1, when inserting into a table, Hive will create the staging directory under
+    // a common scratch directory. After the writing is finished, Hive will simply empty the table
+    // directory and move the staging directory to it.
+    // After Hive 1.1, Hive will create the staging directory under the table directory, and when
+    // moving staging directory to table directory, Hive will still empty the table directory, but
+    // will exclude the staging directory there.
+    // We have to follow the Hive behavior here, to avoid troubles. For example, if we create
+    // staging directory under the table director for Hive prior to 1.1, the staging directory will
+    // be removed by Hive when Hive is trying to empty the table directory.
+    if (hiveVersion == v12 || hiveVersion == v13 || hiveVersion == v14 || hiveVersion == v1_0) {
+      oldVersionExternalTempPath(path)
+    } else if (hiveVersion == v1_1 || hiveVersion == v1_2) {
+      newVersionExternalTempPath(path)
+    } else {
+      throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
+    }
+  }
+
+  // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
+  def oldVersionExternalTempPath(path: Path): Path = {
+    val extURI: URI = path.toUri
+    val scratchPath = new Path(scratchDir, executionId)
+    var dirPath = new Path(
+      extURI.getScheme,
+      extURI.getAuthority,
+      scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
+
+    try {
+      val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
+      dirPath = new Path(fs.makeQualified(dirPath).toString())
+
+      if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
+        throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
+      }
+      createdTempDir = Some(dirPath)
+      fs.deleteOnExit(dirPath)
+    } catch {
+      case e: IOException =>
+        throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e)
+    }
+    dirPath
   }
 
-  def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = {
+  // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
+  def newVersionExternalTempPath(path: Path): Path = {
     val extURI: URI = path.toUri
     if (extURI.getScheme == "viewfs") {
-      getExtTmpPathRelTo(path.getParent, hadoopConf)
+      getExtTmpPathRelTo(path.getParent)
     } else {
-      new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000")
+      new Path(getExternalScratchDir(extURI), "-ext-10000")
     }
   }
 
-  def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = {
-    new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000
+  def getExtTmpPathRelTo(path: Path): Path = {
+    new Path(getStagingDir(path), "-ext-10000") // Hive uses 10000
   }
 
   private def saveAsHiveFile(
@@ -172,7 +223,7 @@ case class InsertIntoHiveTable(
     // instances within the closure, since Serializer is not serializable while TableDesc is.
     val tableDesc = table.tableDesc
     val tableLocation = table.hiveQlTable.getDataLocation
-    val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf)
+    val tmpLocation = getExternalTmpPath(tableLocation)
     val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
     val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
 
@@ -328,6 +379,15 @@ case class InsertIntoHiveTable(
         holdDDLTime)
     }
 
+    // Attempt to delete the staging directory and the inclusive files. If failed, the files are
+    // expected to be dropped at the normal termination of VM since deleteOnExit is used.
+    try {
+      createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) }
+    } catch {
+      case NonFatal(e) =>
+        logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
+    }
+
     // Invalidate the cache.
     sqlContext.sharedState.cacheManager.invalidateCache(table)
     sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 79e76b3134c2a7cea2f4d9accfdd007ca24c35d4..bfec43070a797bc8cea661c70f04da2545930755 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -26,13 +26,15 @@ import org.apache.hadoop.mapred.TextInputFormat
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.hive.HiveUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types.IntegerType
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.tags.ExtendedHiveTest
@@ -45,7 +47,7 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
  * is not fully tested.
  */
 @ExtendedHiveTest
-class VersionsSuite extends SparkFunSuite with Logging {
+class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingleton with Logging {
 
   private val clientBuilder = new HiveClientBuilder
   import clientBuilder.buildClient
@@ -530,5 +532,42 @@ class VersionsSuite extends SparkFunSuite with Logging {
       client.reset()
       assert(client.listTables("default").isEmpty)
     }
+
+    ///////////////////////////////////////////////////////////////////////////
+    // End-To-End tests
+    ///////////////////////////////////////////////////////////////////////////
+
+    test(s"$version: CREATE TABLE AS SELECT") {
+      withTable("tbl") {
+        spark.sql("CREATE TABLE tbl AS SELECT 1 AS a")
+        assert(spark.table("tbl").collect().toSeq == Seq(Row(1)))
+      }
+    }
+
+    test(s"$version: Delete the temporary staging directory and files after each insert") {
+      withTempDir { tmpDir =>
+        withTable("tab") {
+          spark.sql(
+            s"""
+               |CREATE TABLE tab(c1 string)
+               |location '${tmpDir.toURI.toString}'
+             """.stripMargin)
+
+          (1 to 3).map { i =>
+            spark.sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'")
+          }
+          def listFiles(path: File): List[String] = {
+            val dir = path.listFiles()
+            val folders = dir.filter(_.isDirectory).toList
+            val filePaths = dir.map(_.getName).toList
+            folders.flatMap(listFiles) ++: filePaths
+          }
+          val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil
+          assert(listFiles(tmpDir).sorted == expectedFiles)
+        }
+      }
+    }
+
+    // TODO: add more tests.
   }
 }