Skip to content
Snippets Groups Projects
Commit 8c7e19a3 authored by Jane Wang's avatar Jane Wang Committed by gatorsmile
Browse files

[SPARK-4131] Merge HiveTmpFile.scala to SaveAsHiveFile.scala

## What changes were proposed in this pull request?

The code is already merged to master:
https://github.com/apache/spark/pull/18975

This is a following up PR to merge HiveTmpFile.scala to SaveAsHiveFile.

## How was this patch tested?

Build successfully

Author: Jane Wang <janewang@fb.com>

Closes #19221 from janewangfb/merge_savehivefile_hivetmpfile.
parent 21c4450f
No related branches found
No related tags found
No related merge requests found
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.execution
import java.io.{File, IOException}
import java.net.URI
import java.text.SimpleDateFormat
import java.util.{Date, Locale, Random}
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.exec.TaskRunner
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.client.HiveVersion
// Base trait for getting a temporary location for writing data
private[hive] trait HiveTmpPath extends Logging {
var createdTempDir: Option[Path] = None
def getExternalTmpPath(
sparkSession: SparkSession,
hadoopConf: Configuration,
path: Path): Path = {
import org.apache.spark.sql.hive.client.hive._
// Before Hive 1.1, when inserting into a table, Hive will create the staging directory under
// a common scratch directory. After the writing is finished, Hive will simply empty the table
// directory and move the staging directory to it.
// After Hive 1.1, Hive will create the staging directory under the table directory, and when
// moving staging directory to table directory, Hive will still empty the table directory, but
// will exclude the staging directory there.
// We have to follow the Hive behavior here, to avoid troubles. For example, if we create
// staging directory under the table director for Hive prior to 1.1, the staging directory will
// be removed by Hive when Hive is trying to empty the table directory.
val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0, v2_1)
// Ensure all the supported versions are considered here.
assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
allSupportedHiveVersions)
val externalCatalog = sparkSession.sharedState.externalCatalog
val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
oldVersionExternalTempPath(path, hadoopConf, scratchDir)
} else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
newVersionExternalTempPath(path, hadoopConf, stagingDir)
} else {
throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
}
}
def deleteExternalTmpPath(hadoopConf: Configuration) : Unit = {
// Attempt to delete the staging directory and the inclusive files. If failed, the files are
// expected to be dropped at the normal termination of VM since deleteOnExit is used.
try {
createdTempDir.foreach { path =>
val fs = path.getFileSystem(hadoopConf)
if (fs.delete(path, true)) {
// If we successfully delete the staging directory, remove it from FileSystem's cache.
fs.cancelDeleteOnExit(path)
}
}
} catch {
case NonFatal(e) =>
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
}
}
// Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
private def oldVersionExternalTempPath(
path: Path,
hadoopConf: Configuration,
scratchDir: String): Path = {
val extURI: URI = path.toUri
val scratchPath = new Path(scratchDir, executionId)
var dirPath = new Path(
extURI.getScheme,
extURI.getAuthority,
scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
try {
val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
dirPath = new Path(fs.makeQualified(dirPath).toString())
if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
}
createdTempDir = Some(dirPath)
fs.deleteOnExit(dirPath)
} catch {
case e: IOException =>
throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e)
}
dirPath
}
// Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
private def newVersionExternalTempPath(
path: Path,
hadoopConf: Configuration,
stagingDir: String): Path = {
val extURI: URI = path.toUri
if (extURI.getScheme == "viewfs") {
getExtTmpPathRelTo(path.getParent, hadoopConf, stagingDir)
} else {
new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000")
}
}
private def getExtTmpPathRelTo(
path: Path,
hadoopConf: Configuration,
stagingDir: String): Path = {
new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000
}
private def getExternalScratchDir(
extURI: URI,
hadoopConf: Configuration,
stagingDir: String): Path = {
getStagingDir(
new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath),
hadoopConf,
stagingDir)
}
private def getStagingDir(
inputPath: Path,
hadoopConf: Configuration,
stagingDir: String): Path = {
val inputPathUri: URI = inputPath.toUri
val inputPathName: String = inputPathUri.getPath
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
var stagingPathName: String =
if (inputPathName.indexOf(stagingDir) == -1) {
new Path(inputPathName, stagingDir).toString
} else {
inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
}
// SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the
// staging directory needs to avoid being deleted when users set hive.exec.stagingdir
// under the table directory.
if (FileUtils.isSubDir(new Path(stagingPathName), inputPath, fs) &&
!stagingPathName.stripPrefix(inputPathName).stripPrefix(File.separator).startsWith(".")) {
logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " +
"with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " +
"directory.")
stagingPathName = new Path(inputPathName, ".hive-staging").toString
}
val dir: Path =
fs.makeQualified(
new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID))
logDebug("Created staging dir = " + dir + " for path = " + inputPath)
try {
if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'")
}
createdTempDir = Some(dir)
fs.deleteOnExit(dir)
} catch {
case e: IOException =>
throw new RuntimeException(
"Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
}
dir
}
private def executionId: String = {
val rand: Random = new Random
val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
"hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
}
}
......@@ -55,7 +55,7 @@ case class InsertIntoHiveDirCommand(
isLocal: Boolean,
storage: CatalogStorageFormat,
query: LogicalPlan,
overwrite: Boolean) extends SaveAsHiveFile with HiveTmpPath {
overwrite: Boolean) extends SaveAsHiveFile {
override def children: Seq[LogicalPlan] = query :: Nil
......
......@@ -70,7 +70,7 @@ case class InsertIntoHiveTable(
partition: Map[String, Option[String]],
query: LogicalPlan,
overwrite: Boolean,
ifPartitionNotExists: Boolean) extends SaveAsHiveFile with HiveTmpPath {
ifPartitionNotExists: Boolean) extends SaveAsHiveFile {
override def children: Seq[LogicalPlan] = query :: Nil
......
......@@ -17,7 +17,17 @@
package org.apache.spark.sql.hive.execution
import java.io.{File, IOException}
import java.net.URI
import java.text.SimpleDateFormat
import java.util.{Date, Locale, Random}
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.exec.TaskRunner
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.SparkSession
......@@ -25,11 +35,15 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.execution.datasources.FileFormatWriter
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive.client.HiveVersion
// Base trait from which all hive insert statement physical execution extends.
private[hive] trait SaveAsHiveFile extends DataWritingCommand {
var createdTempDir: Option[Path] = None
protected def saveAsHiveFile(
sparkSession: SparkSession,
plan: SparkPlan,
......@@ -69,5 +83,166 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
options = Map.empty)
}
protected def getExternalTmpPath(
sparkSession: SparkSession,
hadoopConf: Configuration,
path: Path): Path = {
import org.apache.spark.sql.hive.client.hive._
// Before Hive 1.1, when inserting into a table, Hive will create the staging directory under
// a common scratch directory. After the writing is finished, Hive will simply empty the table
// directory and move the staging directory to it.
// After Hive 1.1, Hive will create the staging directory under the table directory, and when
// moving staging directory to table directory, Hive will still empty the table directory, but
// will exclude the staging directory there.
// We have to follow the Hive behavior here, to avoid troubles. For example, if we create
// staging directory under the table director for Hive prior to 1.1, the staging directory will
// be removed by Hive when Hive is trying to empty the table directory.
val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0, v2_1)
// Ensure all the supported versions are considered here.
assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
allSupportedHiveVersions)
val externalCatalog = sparkSession.sharedState.externalCatalog
val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
oldVersionExternalTempPath(path, hadoopConf, scratchDir)
} else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
newVersionExternalTempPath(path, hadoopConf, stagingDir)
} else {
throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
}
}
protected def deleteExternalTmpPath(hadoopConf: Configuration) : Unit = {
// Attempt to delete the staging directory and the inclusive files. If failed, the files are
// expected to be dropped at the normal termination of VM since deleteOnExit is used.
try {
createdTempDir.foreach { path =>
val fs = path.getFileSystem(hadoopConf)
if (fs.delete(path, true)) {
// If we successfully delete the staging directory, remove it from FileSystem's cache.
fs.cancelDeleteOnExit(path)
}
}
} catch {
case NonFatal(e) =>
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
}
}
// Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
private def oldVersionExternalTempPath(
path: Path,
hadoopConf: Configuration,
scratchDir: String): Path = {
val extURI: URI = path.toUri
val scratchPath = new Path(scratchDir, executionId)
var dirPath = new Path(
extURI.getScheme,
extURI.getAuthority,
scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
try {
val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
dirPath = new Path(fs.makeQualified(dirPath).toString())
if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
}
createdTempDir = Some(dirPath)
fs.deleteOnExit(dirPath)
} catch {
case e: IOException =>
throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e)
}
dirPath
}
// Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
private def newVersionExternalTempPath(
path: Path,
hadoopConf: Configuration,
stagingDir: String): Path = {
val extURI: URI = path.toUri
if (extURI.getScheme == "viewfs") {
getExtTmpPathRelTo(path.getParent, hadoopConf, stagingDir)
} else {
new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000")
}
}
private def getExtTmpPathRelTo(
path: Path,
hadoopConf: Configuration,
stagingDir: String): Path = {
new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000
}
private def getExternalScratchDir(
extURI: URI,
hadoopConf: Configuration,
stagingDir: String): Path = {
getStagingDir(
new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath),
hadoopConf,
stagingDir)
}
private def getStagingDir(
inputPath: Path,
hadoopConf: Configuration,
stagingDir: String): Path = {
val inputPathUri: URI = inputPath.toUri
val inputPathName: String = inputPathUri.getPath
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
var stagingPathName: String =
if (inputPathName.indexOf(stagingDir) == -1) {
new Path(inputPathName, stagingDir).toString
} else {
inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
}
// SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the
// staging directory needs to avoid being deleted when users set hive.exec.stagingdir
// under the table directory.
if (FileUtils.isSubDir(new Path(stagingPathName), inputPath, fs) &&
!stagingPathName.stripPrefix(inputPathName).stripPrefix(File.separator).startsWith(".")) {
logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " +
"with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " +
"directory.")
stagingPathName = new Path(inputPathName, ".hive-staging").toString
}
val dir: Path =
fs.makeQualified(
new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID))
logDebug("Created staging dir = " + dir + " for path = " + inputPath)
try {
if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'")
}
createdTempDir = Some(dir)
fs.deleteOnExit(dir)
} catch {
case e: IOException =>
throw new RuntimeException(
"Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
}
dir
}
private def executionId: String = {
val rand: Random = new Random
val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
"hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment