Skip to content
Snippets Groups Projects
Commit 3080f995 authored by gatorsmile's avatar gatorsmile Committed by Wenchen Fan
Browse files

[SPARK-18703][SPARK-18675][SQL][BACKPORT-2.1] CTAS for hive serde table should...

[SPARK-18703][SPARK-18675][SQL][BACKPORT-2.1] CTAS for hive serde table should work for all hive versions AND Drop Staging Directories and Data Files

### What changes were proposed in this pull request?

This PR is to backport https://github.com/apache/spark/pull/16104 and https://github.com/apache/spark/pull/16134.

----------
[[SPARK-18675][SQL] CTAS for hive serde table should work for all hive versions](https://github.com/apache/spark/pull/16104)

Before hive 1.1, when inserting into a table, hive will create the staging directory under a common scratch directory. After the writing is finished, hive will simply empty the table directory and move the staging directory to it.

After hive 1.1, hive will create the staging directory under the table directory, and when moving staging directory to table directory, hive will still empty the table directory, but will exclude the staging directory there.

In `InsertIntoHiveTable`, we simply copy the code from hive 1.2, which means we will always create the staging directory under the table directory, no matter what the hive version is. This causes problems if the hive version is prior to 1.1, because the staging directory will be removed by hive when hive is trying to empty the table directory.

This PR copies the code from hive 0.13, so that we have 2 branches to create staging directory. If hive version is prior to 1.1, we'll go to the old style branch(i.e. create the staging directory under a common scratch directory), else, go to the new style branch(i.e. create the staging directory under the table directory)

----------
[[SPARK-18703] [SQL] Drop Staging Directories and Data Files After each Insertion/CTAS of Hive serde Tables](https://github.com/apache/spark/pull/16134)

Below are the files/directories generated for three inserts againsts a Hive table:
```
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/._SUCCESS.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/_SUCCESS
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/part-00000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/._SUCCESS.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/_SUCCESS
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/part-00000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/._SUCCESS.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/_SUCCESS
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/part-00000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/part-00000
```

The first 18 files are temporary. We do not drop it until the end of JVM termination. If JVM does not appropriately terminate, these temporary files/directories will not be dropped.

Only the last two files are needed, as shown below.
```
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/part-00000
```
The temporary files/directories could accumulate a lot when we issue many inserts, since each insert generats at least six files. This could eat a lot of spaces and slow down the JVM termination. When the JVM does not terminates approprately, the files might not be dropped.

This PR is to drop the created staging files and temporary data files after each insert/CTAS.

### How was this patch tested?
Added test cases.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #16325 from gatorsmile/backport-18703&18675.
parent a5da8db8
No related branches found
No related tags found
No related merge requests found
......@@ -22,7 +22,8 @@ import java.net.URI
import java.text.SimpleDateFormat
import java.util.{Date, Locale, Random}
import org.apache.hadoop.conf.Configuration
import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.exec.TaskRunner
......@@ -85,7 +86,9 @@ case class InsertIntoHiveTable(
def output: Seq[Attribute] = Seq.empty
val hadoopConf = sessionState.newHadoopConf()
var createdTempDir: Option[Path] = None
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
private def executionId: String = {
val rand: Random = new Random
......@@ -93,7 +96,7 @@ case class InsertIntoHiveTable(
"hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
}
private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = {
private def getStagingDir(inputPath: Path): Path = {
val inputPathUri: URI = inputPath.toUri
val inputPathName: String = inputPathUri.getPath
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
......@@ -111,31 +114,79 @@ case class InsertIntoHiveTable(
if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'")
}
createdTempDir = Some(dir)
fs.deleteOnExit(dir)
} catch {
case e: IOException =>
throw new RuntimeException(
"Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
}
return dir
}
private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = {
getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf)
private def getExternalScratchDir(extURI: URI): Path = {
getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath))
}
def getExternalTmpPath(path: Path): Path = {
import org.apache.spark.sql.hive.client.hive._
val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version
// Before Hive 1.1, when inserting into a table, Hive will create the staging directory under
// a common scratch directory. After the writing is finished, Hive will simply empty the table
// directory and move the staging directory to it.
// After Hive 1.1, Hive will create the staging directory under the table directory, and when
// moving staging directory to table directory, Hive will still empty the table directory, but
// will exclude the staging directory there.
// We have to follow the Hive behavior here, to avoid troubles. For example, if we create
// staging directory under the table director for Hive prior to 1.1, the staging directory will
// be removed by Hive when Hive is trying to empty the table directory.
if (hiveVersion == v12 || hiveVersion == v13 || hiveVersion == v14 || hiveVersion == v1_0) {
oldVersionExternalTempPath(path)
} else if (hiveVersion == v1_1 || hiveVersion == v1_2) {
newVersionExternalTempPath(path)
} else {
throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
}
}
// Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
def oldVersionExternalTempPath(path: Path): Path = {
val extURI: URI = path.toUri
val scratchPath = new Path(scratchDir, executionId)
var dirPath = new Path(
extURI.getScheme,
extURI.getAuthority,
scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
try {
val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
dirPath = new Path(fs.makeQualified(dirPath).toString())
if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
}
createdTempDir = Some(dirPath)
fs.deleteOnExit(dirPath)
} catch {
case e: IOException =>
throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e)
}
dirPath
}
def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = {
// Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
def newVersionExternalTempPath(path: Path): Path = {
val extURI: URI = path.toUri
if (extURI.getScheme == "viewfs") {
getExtTmpPathRelTo(path.getParent, hadoopConf)
getExtTmpPathRelTo(path.getParent)
} else {
new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000")
new Path(getExternalScratchDir(extURI), "-ext-10000")
}
}
def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = {
new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000
def getExtTmpPathRelTo(path: Path): Path = {
new Path(getStagingDir(path), "-ext-10000") // Hive uses 10000
}
private def saveAsHiveFile(
......@@ -172,7 +223,7 @@ case class InsertIntoHiveTable(
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = table.tableDesc
val tableLocation = table.hiveQlTable.getDataLocation
val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf)
val tmpLocation = getExternalTmpPath(tableLocation)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
......@@ -328,6 +379,15 @@ case class InsertIntoHiveTable(
holdDDLTime)
}
// Attempt to delete the staging directory and the inclusive files. If failed, the files are
// expected to be dropped at the normal termination of VM since deleteOnExit is used.
try {
createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) }
} catch {
case NonFatal(e) =>
logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
}
// Invalidate the cache.
sqlContext.sharedState.cacheManager.invalidateCache(table)
sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier)
......
......@@ -26,13 +26,15 @@ import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StructType
import org.apache.spark.tags.ExtendedHiveTest
......@@ -45,7 +47,7 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
* is not fully tested.
*/
@ExtendedHiveTest
class VersionsSuite extends SparkFunSuite with Logging {
class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingleton with Logging {
private val clientBuilder = new HiveClientBuilder
import clientBuilder.buildClient
......@@ -530,5 +532,42 @@ class VersionsSuite extends SparkFunSuite with Logging {
client.reset()
assert(client.listTables("default").isEmpty)
}
///////////////////////////////////////////////////////////////////////////
// End-To-End tests
///////////////////////////////////////////////////////////////////////////
test(s"$version: CREATE TABLE AS SELECT") {
withTable("tbl") {
spark.sql("CREATE TABLE tbl AS SELECT 1 AS a")
assert(spark.table("tbl").collect().toSeq == Seq(Row(1)))
}
}
test(s"$version: Delete the temporary staging directory and files after each insert") {
withTempDir { tmpDir =>
withTable("tab") {
spark.sql(
s"""
|CREATE TABLE tab(c1 string)
|location '${tmpDir.toURI.toString}'
""".stripMargin)
(1 to 3).map { i =>
spark.sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'")
}
def listFiles(path: File): List[String] = {
val dir = path.listFiles()
val folders = dir.filter(_.isDirectory).toList
val filePaths = dir.map(_.getName).toList
folders.flatMap(listFiles) ++: filePaths
}
val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil
assert(listFiles(tmpDir).sorted == expectedFiles)
}
}
}
// TODO: add more tests.
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment