diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTmpPath.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTmpPath.scala deleted file mode 100644 index 15ca1dfc76d19ef759a01bb9f16b6298e5993ee9..0000000000000000000000000000000000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTmpPath.scala +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution - -import java.io.{File, IOException} -import java.net.URI -import java.text.SimpleDateFormat -import java.util.{Date, Locale, Random} - -import scala.util.control.NonFatal - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.hive.common.FileUtils -import org.apache.hadoop.hive.ql.exec.TaskRunner - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.hive.HiveExternalCatalog -import org.apache.spark.sql.hive.client.HiveVersion - -// Base trait for getting a temporary location for writing data -private[hive] trait HiveTmpPath extends Logging { - - var createdTempDir: Option[Path] = None - - def getExternalTmpPath( - sparkSession: SparkSession, - hadoopConf: Configuration, - path: Path): Path = { - import org.apache.spark.sql.hive.client.hive._ - - // Before Hive 1.1, when inserting into a table, Hive will create the staging directory under - // a common scratch directory. After the writing is finished, Hive will simply empty the table - // directory and move the staging directory to it. - // After Hive 1.1, Hive will create the staging directory under the table directory, and when - // moving staging directory to table directory, Hive will still empty the table directory, but - // will exclude the staging directory there. - // We have to follow the Hive behavior here, to avoid troubles. For example, if we create - // staging directory under the table director for Hive prior to 1.1, the staging directory will - // be removed by Hive when Hive is trying to empty the table directory. - val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0) - val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0, v2_1) - - // Ensure all the supported versions are considered here. - assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath == - allSupportedHiveVersions) - - val externalCatalog = sparkSession.sharedState.externalCatalog - val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version - val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") - val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") - - if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) { - oldVersionExternalTempPath(path, hadoopConf, scratchDir) - } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) { - newVersionExternalTempPath(path, hadoopConf, stagingDir) - } else { - throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion) - } - } - - def deleteExternalTmpPath(hadoopConf: Configuration) : Unit = { - // Attempt to delete the staging directory and the inclusive files. If failed, the files are - // expected to be dropped at the normal termination of VM since deleteOnExit is used. - try { - createdTempDir.foreach { path => - val fs = path.getFileSystem(hadoopConf) - if (fs.delete(path, true)) { - // If we successfully delete the staging directory, remove it from FileSystem's cache. - fs.cancelDeleteOnExit(path) - } - } - } catch { - case NonFatal(e) => - val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") - logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) - } - } - - // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13 - private def oldVersionExternalTempPath( - path: Path, - hadoopConf: Configuration, - scratchDir: String): Path = { - val extURI: URI = path.toUri - val scratchPath = new Path(scratchDir, executionId) - var dirPath = new Path( - extURI.getScheme, - extURI.getAuthority, - scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID()) - - try { - val fs: FileSystem = dirPath.getFileSystem(hadoopConf) - dirPath = new Path(fs.makeQualified(dirPath).toString()) - - if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) { - throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString) - } - createdTempDir = Some(dirPath) - fs.deleteOnExit(dirPath) - } catch { - case e: IOException => - throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e) - } - dirPath - } - - // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2 - private def newVersionExternalTempPath( - path: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { - val extURI: URI = path.toUri - if (extURI.getScheme == "viewfs") { - getExtTmpPathRelTo(path.getParent, hadoopConf, stagingDir) - } else { - new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000") - } - } - - private def getExtTmpPathRelTo( - path: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { - new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000 - } - - private def getExternalScratchDir( - extURI: URI, - hadoopConf: Configuration, - stagingDir: String): Path = { - getStagingDir( - new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), - hadoopConf, - stagingDir) - } - - private def getStagingDir( - inputPath: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { - val inputPathUri: URI = inputPath.toUri - val inputPathName: String = inputPathUri.getPath - val fs: FileSystem = inputPath.getFileSystem(hadoopConf) - var stagingPathName: String = - if (inputPathName.indexOf(stagingDir) == -1) { - new Path(inputPathName, stagingDir).toString - } else { - inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) - } - - // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the - // staging directory needs to avoid being deleted when users set hive.exec.stagingdir - // under the table directory. - if (FileUtils.isSubDir(new Path(stagingPathName), inputPath, fs) && - !stagingPathName.stripPrefix(inputPathName).stripPrefix(File.separator).startsWith(".")) { - logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " + - "with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " + - "directory.") - stagingPathName = new Path(inputPathName, ".hive-staging").toString - } - - val dir: Path = - fs.makeQualified( - new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID)) - logDebug("Created staging dir = " + dir + " for path = " + inputPath) - try { - if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { - throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") - } - createdTempDir = Some(dir) - fs.deleteOnExit(dir) - } catch { - case e: IOException => - throw new RuntimeException( - "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e) - } - dir - } - - private def executionId: String = { - val rand: Random = new Random - val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US) - "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) - } -} - diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala index 2110038db36ac4711caa28104126466b03610a2d..918c8be00d69d333b21a01bbf8071e4489f7069e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala @@ -55,7 +55,7 @@ case class InsertIntoHiveDirCommand( isLocal: Boolean, storage: CatalogStorageFormat, query: LogicalPlan, - overwrite: Boolean) extends SaveAsHiveFile with HiveTmpPath { + overwrite: Boolean) extends SaveAsHiveFile { override def children: Seq[LogicalPlan] = query :: Nil diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 5bdc97a2982df9dc668698a0957eb7cfa074ea6a..e5b59ed7a1a6bd3416696a2be1d6d7f0790672d1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -70,7 +70,7 @@ case class InsertIntoHiveTable( partition: Map[String, Option[String]], query: LogicalPlan, overwrite: Boolean, - ifPartitionNotExists: Boolean) extends SaveAsHiveFile with HiveTmpPath { + ifPartitionNotExists: Boolean) extends SaveAsHiveFile { override def children: Seq[LogicalPlan] = query :: Nil diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala index 7de9b421245f04e80bf785f32fc192d9f535c0c2..ad8699489d4bfc757867aaa0c8613d6e76aa7a49 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -17,7 +17,17 @@ package org.apache.spark.sql.hive.execution +import java.io.{File, IOException} +import java.net.URI +import java.text.SimpleDateFormat +import java.util.{Date, Locale, Random} + +import scala.util.control.NonFatal + import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.common.FileUtils +import org.apache.hadoop.hive.ql.exec.TaskRunner import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.SparkSession @@ -25,11 +35,15 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.DataWritingCommand import org.apache.spark.sql.execution.datasources.FileFormatWriter +import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} +import org.apache.spark.sql.hive.client.HiveVersion // Base trait from which all hive insert statement physical execution extends. private[hive] trait SaveAsHiveFile extends DataWritingCommand { + var createdTempDir: Option[Path] = None + protected def saveAsHiveFile( sparkSession: SparkSession, plan: SparkPlan, @@ -69,5 +83,166 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)), options = Map.empty) } + + protected def getExternalTmpPath( + sparkSession: SparkSession, + hadoopConf: Configuration, + path: Path): Path = { + import org.apache.spark.sql.hive.client.hive._ + + // Before Hive 1.1, when inserting into a table, Hive will create the staging directory under + // a common scratch directory. After the writing is finished, Hive will simply empty the table + // directory and move the staging directory to it. + // After Hive 1.1, Hive will create the staging directory under the table directory, and when + // moving staging directory to table directory, Hive will still empty the table directory, but + // will exclude the staging directory there. + // We have to follow the Hive behavior here, to avoid troubles. For example, if we create + // staging directory under the table director for Hive prior to 1.1, the staging directory will + // be removed by Hive when Hive is trying to empty the table directory. + val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0) + val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0, v2_1) + + // Ensure all the supported versions are considered here. + assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath == + allSupportedHiveVersions) + + val externalCatalog = sparkSession.sharedState.externalCatalog + val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version + val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") + val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") + + if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) { + oldVersionExternalTempPath(path, hadoopConf, scratchDir) + } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) { + newVersionExternalTempPath(path, hadoopConf, stagingDir) + } else { + throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion) + } + } + + protected def deleteExternalTmpPath(hadoopConf: Configuration) : Unit = { + // Attempt to delete the staging directory and the inclusive files. If failed, the files are + // expected to be dropped at the normal termination of VM since deleteOnExit is used. + try { + createdTempDir.foreach { path => + val fs = path.getFileSystem(hadoopConf) + if (fs.delete(path, true)) { + // If we successfully delete the staging directory, remove it from FileSystem's cache. + fs.cancelDeleteOnExit(path) + } + } + } catch { + case NonFatal(e) => + val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") + logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) + } + } + + // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13 + private def oldVersionExternalTempPath( + path: Path, + hadoopConf: Configuration, + scratchDir: String): Path = { + val extURI: URI = path.toUri + val scratchPath = new Path(scratchDir, executionId) + var dirPath = new Path( + extURI.getScheme, + extURI.getAuthority, + scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID()) + + try { + val fs: FileSystem = dirPath.getFileSystem(hadoopConf) + dirPath = new Path(fs.makeQualified(dirPath).toString()) + + if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) { + throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString) + } + createdTempDir = Some(dirPath) + fs.deleteOnExit(dirPath) + } catch { + case e: IOException => + throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e) + } + dirPath + } + + // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2 + private def newVersionExternalTempPath( + path: Path, + hadoopConf: Configuration, + stagingDir: String): Path = { + val extURI: URI = path.toUri + if (extURI.getScheme == "viewfs") { + getExtTmpPathRelTo(path.getParent, hadoopConf, stagingDir) + } else { + new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000") + } + } + + private def getExtTmpPathRelTo( + path: Path, + hadoopConf: Configuration, + stagingDir: String): Path = { + new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000 + } + + private def getExternalScratchDir( + extURI: URI, + hadoopConf: Configuration, + stagingDir: String): Path = { + getStagingDir( + new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), + hadoopConf, + stagingDir) + } + + private def getStagingDir( + inputPath: Path, + hadoopConf: Configuration, + stagingDir: String): Path = { + val inputPathUri: URI = inputPath.toUri + val inputPathName: String = inputPathUri.getPath + val fs: FileSystem = inputPath.getFileSystem(hadoopConf) + var stagingPathName: String = + if (inputPathName.indexOf(stagingDir) == -1) { + new Path(inputPathName, stagingDir).toString + } else { + inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) + } + + // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the + // staging directory needs to avoid being deleted when users set hive.exec.stagingdir + // under the table directory. + if (FileUtils.isSubDir(new Path(stagingPathName), inputPath, fs) && + !stagingPathName.stripPrefix(inputPathName).stripPrefix(File.separator).startsWith(".")) { + logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " + + "with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " + + "directory.") + stagingPathName = new Path(inputPathName, ".hive-staging").toString + } + + val dir: Path = + fs.makeQualified( + new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID)) + logDebug("Created staging dir = " + dir + " for path = " + inputPath) + try { + if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { + throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") + } + createdTempDir = Some(dir) + fs.deleteOnExit(dir) + } catch { + case e: IOException => + throw new RuntimeException( + "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e) + } + dir + } + + private def executionId: String = { + val rand: Random = new Random + val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US) + "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) + } }