diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index db2239d26aaa6a8be53628ea9bc53dab4dfc6788..82c7b1a3c6b81ea5aa3861cb146090ec2cb9063a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -22,7 +22,6 @@ import java.net.URI import java.text.SimpleDateFormat import java.util.{Date, Locale, Random} -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.FileUtils import org.apache.hadoop.hive.ql.exec.TaskRunner @@ -86,6 +85,7 @@ case class InsertIntoHiveTable( val hadoopConf = sessionState.newHadoopConf() val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") + val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") private def executionId: String = { val rand: Random = new Random @@ -93,7 +93,7 @@ case class InsertIntoHiveTable( "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) } - private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = { + private def getStagingDir(inputPath: Path): Path = { val inputPathUri: URI = inputPath.toUri val inputPathName: String = inputPathUri.getPath val fs: FileSystem = inputPath.getFileSystem(hadoopConf) @@ -121,21 +121,69 @@ case class InsertIntoHiveTable( return dir } - private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = { - getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf) + private def getExternalScratchDir(extURI: URI): Path = { + getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath)) } - def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = { + def getExternalTmpPath(path: Path): Path = { + import org.apache.spark.sql.hive.client.hive._ + + val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version + // Before Hive 1.1, when inserting into a table, Hive will create the staging directory under + // a common scratch directory. After the writing is finished, Hive will simply empty the table + // directory and move the staging directory to it. + // After Hive 1.1, Hive will create the staging directory under the table directory, and when + // moving staging directory to table directory, Hive will still empty the table directory, but + // will exclude the staging directory there. + // We have to follow the Hive behavior here, to avoid troubles. For example, if we create + // staging directory under the table director for Hive prior to 1.1, the staging directory will + // be removed by Hive when Hive is trying to empty the table directory. + if (hiveVersion == v12 || hiveVersion == v13 || hiveVersion == v14 || hiveVersion == v1_0) { + oldVersionExternalTempPath(path) + } else if (hiveVersion == v1_1 || hiveVersion == v1_2) { + newVersionExternalTempPath(path) + } else { + throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion) + } + } + + // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13 + def oldVersionExternalTempPath(path: Path): Path = { + val extURI: URI = path.toUri + val scratchPath = new Path(scratchDir, executionId) + var dirPath = new Path( + extURI.getScheme, + extURI.getAuthority, + scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID()) + + try { + val fs: FileSystem = dirPath.getFileSystem(hadoopConf) + dirPath = new Path(fs.makeQualified(dirPath).toString()) + + if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) { + throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString) + } + fs.deleteOnExit(dirPath) + } catch { + case e: IOException => + throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e) + + } + dirPath + } + + // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2 + def newVersionExternalTempPath(path: Path): Path = { val extURI: URI = path.toUri if (extURI.getScheme == "viewfs") { - getExtTmpPathRelTo(path.getParent, hadoopConf) + getExtTmpPathRelTo(path.getParent) } else { - new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000") + new Path(getExternalScratchDir(extURI), "-ext-10000") } } - def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = { - new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000 + def getExtTmpPathRelTo(path: Path): Path = { + new Path(getStagingDir(path), "-ext-10000") // Hive uses 10000 } private def saveAsHiveFile( @@ -172,7 +220,7 @@ case class InsertIntoHiveTable( // instances within the closure, since Serializer is not serializable while TableDesc is. val tableDesc = table.tableDesc val tableLocation = table.hiveQlTable.getDataLocation - val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf) + val tmpLocation = getExternalTmpPath(tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index a001048a9ea5d3c31a0d96962bb6b6fb4aaf2c6c..9b26383a162dd61afaf1dadd7d4394d4ec03e221 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -26,13 +26,15 @@ import org.apache.hadoop.mapred.TextInputFormat import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging -import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.hive.HiveUtils +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.StructType import org.apache.spark.tags.ExtendedHiveTest @@ -45,7 +47,7 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils} * is not fully tested. */ @ExtendedHiveTest -class VersionsSuite extends SparkFunSuite with Logging { +class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingleton with Logging { private val clientBuilder = new HiveClientBuilder import clientBuilder.buildClient @@ -532,5 +534,18 @@ class VersionsSuite extends SparkFunSuite with Logging { client.reset() assert(client.listTables("default").isEmpty) } + + /////////////////////////////////////////////////////////////////////////// + // End-To-End tests + /////////////////////////////////////////////////////////////////////////// + + test(s"$version: CREATE TABLE AS SELECT") { + withTable("tbl") { + spark.sql("CREATE TABLE tbl AS SELECT 1 AS a") + assert(spark.table("tbl").collect().toSeq == Seq(Row(1))) + } + } + + // TODO: add more tests. } }