Skip to content
Snippets Groups Projects
Commit 0ea5d5b2 authored by Reynold Xin's avatar Reynold Xin
Browse files

[SQL] minor - internal doc improvement for InsertIntoTable.

## What changes were proposed in this pull request?
I was reading this part of the code and was really confused by the "partition" parameter. This patch adds some documentation for it to reduce confusion in the future.

I also looked around other logical plans but most of them are either already documented, or pretty self-evident to people that know Spark SQL.

## How was this patch tested?
N/A - doc change only.

Author: Reynold Xin <rxin@databricks.com>

Closes #15749 from rxin/doc-improvement.
parent 937af592
No related branches found
No related tags found
No related merge requests found
......@@ -360,6 +360,22 @@ case class OverwriteOptions(
}
}
/**
* Insert some data into a table.
*
* @param table the logical plan representing the table. In the future this should be a
* [[org.apache.spark.sql.catalyst.catalog.CatalogTable]] once we converge Hive tables
* and data source tables.
* @param partition a map from the partition key to the partition value (optional). If the partition
* value is optional, dynamic partition insert will be performed.
* As an example, `INSERT INTO tbl PARTITION (a=1, b=2) AS ...` would have
* Map('a' -> Some('1'), 'b' -> Some('2')),
* and `INSERT INTO tbl PARTITION (a=1, b) AS ...`
* would have Map('a' -> Some('1'), 'b' -> None).
* @param child the logical plan representing data to write to.
* @param overwrite overwrite existing table or partitions.
* @param ifNotExists If true, only write if the table or partition does not exist.
*/
case class InsertIntoTable(
table: LogicalPlan,
partition: Map[String, Option[String]],
......
......@@ -35,13 +35,35 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.SparkException
import org.apache.spark.util.SerializableJobConf
/**
* Command for writing data out to a Hive table.
*
* This class is mostly a mess, for legacy reasons (since it evolved in organic ways and had to
* follow Hive's internal implementations closely, which itself was a mess too). Please don't
* blame Reynold for this! He was just moving code around!
*
* In the future we should converge the write path for Hive with the normal data source write path,
* as defined in [[org.apache.spark.sql.execution.datasources.FileFormatWriter]].
*
* @param table the logical plan representing the table. In the future this should be a
* [[org.apache.spark.sql.catalyst.catalog.CatalogTable]] once we converge Hive tables
* and data source tables.
* @param partition a map from the partition key to the partition value (optional). If the partition
* value is optional, dynamic partition insert will be performed.
* As an example, `INSERT INTO tbl PARTITION (a=1, b=2) AS ...` would have
* Map('a' -> Some('1'), 'b' -> Some('2')),
* and `INSERT INTO tbl PARTITION (a=1, b) AS ...`
* would have Map('a' -> Some('1'), 'b' -> None).
* @param child the logical plan representing data to write to.
* @param overwrite overwrite existing table or partitions.
* @param ifNotExists If true, only write if the table or partition does not exist.
*/
case class InsertIntoHiveTable(
table: MetastoreRelation,
partition: Map[String, Option[String]],
......@@ -81,8 +103,7 @@ case class InsertIntoHiveTable(
throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'")
}
fs.deleteOnExit(dir)
}
catch {
} catch {
case e: IOException =>
throw new RuntimeException(
"Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
......@@ -123,7 +144,7 @@ case class InsertIntoHiveTable(
FileOutputFormat.setOutputPath(
conf.value,
SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value))
SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName(), conf.value))
log.debug("Saving as hadoop file of type " + valueClass.getSimpleName)
writerContainer.driverSideSetup()
sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _)
......@@ -263,7 +284,7 @@ case class InsertIntoHiveTable(
// version and we may not want to catch up new Hive version every time. We delete the
// Hive partition first and then load data file into the Hive partition.
if (oldPart.nonEmpty && overwrite) {
oldPart.get.storage.locationUri.map { uri =>
oldPart.get.storage.locationUri.foreach { uri =>
val partitionPath = new Path(uri)
val fs = partitionPath.getFileSystem(hadoopConf)
if (fs.exists(partitionPath)) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment