From 0ea5d5b24c1f7b29efeac0e72d271aba279523f7 Mon Sep 17 00:00:00 2001 From: Reynold Xin <rxin@databricks.com> Date: Thu, 3 Nov 2016 02:45:54 -0700 Subject: [PATCH] [SQL] minor - internal doc improvement for InsertIntoTable. ## What changes were proposed in this pull request? I was reading this part of the code and was really confused by the "partition" parameter. This patch adds some documentation for it to reduce confusion in the future. I also looked around other logical plans but most of them are either already documented, or pretty self-evident to people that know Spark SQL. ## How was this patch tested? N/A - doc change only. Author: Reynold Xin <rxin@databricks.com> Closes #15749 from rxin/doc-improvement. --- .../plans/logical/basicLogicalOperators.scala | 16 ++++++++++ .../hive/execution/InsertIntoHiveTable.scala | 31 ++++++++++++++++--- 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 7a15c2285d..65ceab2ce2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -360,6 +360,22 @@ case class OverwriteOptions( } } +/** + * Insert some data into a table. + * + * @param table the logical plan representing the table. In the future this should be a + * [[org.apache.spark.sql.catalyst.catalog.CatalogTable]] once we converge Hive tables + * and data source tables. + * @param partition a map from the partition key to the partition value (optional). If the partition + * value is optional, dynamic partition insert will be performed. + * As an example, `INSERT INTO tbl PARTITION (a=1, b=2) AS ...` would have + * Map('a' -> Some('1'), 'b' -> Some('2')), + * and `INSERT INTO tbl PARTITION (a=1, b) AS ...` + * would have Map('a' -> Some('1'), 'b' -> None). + * @param child the logical plan representing data to write to. + * @param overwrite overwrite existing table or partitions. + * @param ifNotExists If true, only write if the table or partition does not exist. + */ case class InsertIntoTable( table: LogicalPlan, partition: Map[String, Option[String]], diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 05164d774c..15be12cfc0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -35,13 +35,35 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} -import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand} import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.SparkException import org.apache.spark.util.SerializableJobConf +/** + * Command for writing data out to a Hive table. + * + * This class is mostly a mess, for legacy reasons (since it evolved in organic ways and had to + * follow Hive's internal implementations closely, which itself was a mess too). Please don't + * blame Reynold for this! He was just moving code around! + * + * In the future we should converge the write path for Hive with the normal data source write path, + * as defined in [[org.apache.spark.sql.execution.datasources.FileFormatWriter]]. + * + * @param table the logical plan representing the table. In the future this should be a + * [[org.apache.spark.sql.catalyst.catalog.CatalogTable]] once we converge Hive tables + * and data source tables. + * @param partition a map from the partition key to the partition value (optional). If the partition + * value is optional, dynamic partition insert will be performed. + * As an example, `INSERT INTO tbl PARTITION (a=1, b=2) AS ...` would have + * Map('a' -> Some('1'), 'b' -> Some('2')), + * and `INSERT INTO tbl PARTITION (a=1, b) AS ...` + * would have Map('a' -> Some('1'), 'b' -> None). + * @param child the logical plan representing data to write to. + * @param overwrite overwrite existing table or partitions. + * @param ifNotExists If true, only write if the table or partition does not exist. + */ case class InsertIntoHiveTable( table: MetastoreRelation, partition: Map[String, Option[String]], @@ -81,8 +103,7 @@ case class InsertIntoHiveTable( throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") } fs.deleteOnExit(dir) - } - catch { + } catch { case e: IOException => throw new RuntimeException( "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e) @@ -123,7 +144,7 @@ case class InsertIntoHiveTable( FileOutputFormat.setOutputPath( conf.value, - SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value)) + SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName(), conf.value)) log.debug("Saving as hadoop file of type " + valueClass.getSimpleName) writerContainer.driverSideSetup() sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _) @@ -263,7 +284,7 @@ case class InsertIntoHiveTable( // version and we may not want to catch up new Hive version every time. We delete the // Hive partition first and then load data file into the Hive partition. if (oldPart.nonEmpty && overwrite) { - oldPart.get.storage.locationUri.map { uri => + oldPart.get.storage.locationUri.foreach { uri => val partitionPath = new Path(uri) val fs = partitionPath.getFileSystem(hadoopConf) if (fs.exists(partitionPath)) { -- GitLab