Skip to content
Snippets Groups Projects
Commit 8db4d95c authored by gatorsmile's avatar gatorsmile Committed by Wenchen Fan
Browse files

[SPARK-18703][SQL] Drop Staging Directories and Data Files After each...

[SPARK-18703][SQL] Drop Staging Directories and Data Files After each Insertion/CTAS of Hive serde Tables

### What changes were proposed in this pull request?
Below are the files/directories generated for three inserts againsts a Hive table:
```
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/._SUCCESS.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/_SUCCESS
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/part-00000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/._SUCCESS.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/_SUCCESS
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/part-00000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/._SUCCESS.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/_SUCCESS
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/part-00000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/part-00000
```

The first 18 files are temporary. We do not drop it until the end of JVM termination. If JVM does not appropriately terminate, these temporary files/directories will not be dropped.

Only the last two files are needed, as shown below.
```
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/part-00000
```
The temporary files/directories could accumulate a lot when we issue many inserts, since each insert generats at least six files. This could eat a lot of spaces and slow down the JVM termination. When the JVM does not terminates approprately, the files might not be dropped.

This PR is to drop the created staging files and temporary data files after each insert/CTAS.

### How was this patch tested?
Added a test case

Author: gatorsmile <gatorsmile@gmail.com>

Closes #16134 from gatorsmile/deleteFiles.
parent 32438853
No related branches found
No related tags found
No related merge requests found
......@@ -22,6 +22,8 @@ import java.net.URI
import java.text.SimpleDateFormat
import java.util.{Date, Locale, Random}
import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.exec.TaskRunner
......@@ -84,6 +86,7 @@ case class InsertIntoHiveTable(
def output: Seq[Attribute] = Seq.empty
val hadoopConf = sessionState.newHadoopConf()
var createdTempDir: Option[Path] = None
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
......@@ -111,12 +114,12 @@ case class InsertIntoHiveTable(
if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'")
}
createdTempDir = Some(dir)
fs.deleteOnExit(dir)
} catch {
case e: IOException =>
throw new RuntimeException(
"Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
}
return dir
}
......@@ -163,11 +166,11 @@ case class InsertIntoHiveTable(
if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
}
createdTempDir = Some(dirPath)
fs.deleteOnExit(dirPath)
} catch {
case e: IOException =>
throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e)
}
dirPath
}
......@@ -378,6 +381,15 @@ case class InsertIntoHiveTable(
isSrcLocal = false)
}
// Attempt to delete the staging directory and the inclusive files. If failed, the files are
// expected to be dropped at the normal termination of VM since deleteOnExit is used.
try {
createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) }
} catch {
case NonFatal(e) =>
logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
}
// Invalidate the cache.
sqlContext.sharedState.cacheManager.invalidateCache(table)
sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier)
......
......@@ -546,6 +546,30 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSinglet
}
}
test(s"$version: Delete the temporary staging directory and files after each insert") {
withTempDir { tmpDir =>
withTable("tab") {
spark.sql(
s"""
|CREATE TABLE tab(c1 string)
|location '${tmpDir.toURI.toString}'
""".stripMargin)
(1 to 3).map { i =>
spark.sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'")
}
def listFiles(path: File): List[String] = {
val dir = path.listFiles()
val folders = dir.filter(_.isDirectory).toList
val filePaths = dir.map(_.getName).toList
folders.flatMap(listFiles) ++: filePaths
}
val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil
assert(listFiles(tmpDir).sorted == expectedFiles)
}
}
}
// TODO: add more tests.
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment