Skip to content
Snippets Groups Projects
Commit 4424c901 authored by Herman van Hovell's avatar Herman van Hovell Committed by Reynold Xin
Browse files

[SPARK-18370][SQL] Add table information to InsertIntoHadoopFsRelationCommand


## What changes were proposed in this pull request?
`InsertIntoHadoopFsRelationCommand` does not keep track if it inserts into a table and what table it inserts to. This can make debugging these statements problematic. This PR adds table information the `InsertIntoHadoopFsRelationCommand`. Explaining this SQL command `insert into prq select * from range(0, 100000)` now yields the following executed plan:
```
== Physical Plan ==
ExecutedCommand
   +- InsertIntoHadoopFsRelationCommand file:/dev/assembly/spark-warehouse/prq, ParquetFormat, <function1>, Map(serialization.format -> 1, path -> file:/dev/assembly/spark-warehouse/prq), Append, CatalogTable(
	Table: `default`.`prq`
	Owner: hvanhovell
	Created: Wed Nov 09 17:42:30 CET 2016
	Last Access: Thu Jan 01 01:00:00 CET 1970
	Type: MANAGED
	Schema: [StructField(id,LongType,true)]
	Provider: parquet
	Properties: [transient_lastDdlTime=1478709750]
	Storage(Location: file:/dev/assembly/spark-warehouse/prq, InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat, Serde: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Properties: [serialization.format=1]))
         +- Project [id#7L]
            +- Range (0, 100000, step=1, splits=None)
```

## How was this patch tested?
Added extra checks to the `ParquetMetastoreSuite`

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #15832 from hvanhovell/SPARK-18370.

(cherry picked from commit d8b81f77)
Signed-off-by: default avatarReynold Xin <rxin@databricks.com>
parent 80f58510
No related branches found
No related tags found
No related merge requests found
......@@ -424,7 +424,8 @@ case class DataSource(
_ => Unit, // No existing table needs to be refreshed.
options,
data.logicalPlan,
mode)
mode,
catalogTable)
sparkSession.sessionState.executePlan(plan).toRdd
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it.
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
......
......@@ -162,7 +162,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
case i @ logical.InsertIntoTable(
l @ LogicalRelation(t: HadoopFsRelation, _, _), part, query, overwrite, false)
l @ LogicalRelation(t: HadoopFsRelation, _, table), part, query, overwrite, false)
if query.resolved && t.schema.asNullable == query.schema.asNullable =>
// Sanity checks
......@@ -222,7 +222,8 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
refreshPartitionsCallback,
t.options,
query,
mode)
mode,
table)
insertCmd
}
......
......@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
......@@ -41,7 +41,8 @@ case class InsertIntoHadoopFsRelationCommand(
refreshFunction: (Seq[TablePartitionSpec]) => Unit,
options: Map[String, String],
@transient query: LogicalPlan,
mode: SaveMode)
mode: SaveMode,
catalogTable: Option[CatalogTable])
extends RunnableCommand {
override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
......
......@@ -307,7 +307,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
df.queryExecution.sparkPlan match {
case ExecutedCommandExec(_: InsertIntoHadoopFsRelationCommand) => // OK
case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) =>
assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet"))
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan. " +
......@@ -337,7 +338,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
df.queryExecution.sparkPlan match {
case ExecutedCommandExec(_: InsertIntoHadoopFsRelationCommand) => // OK
case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) =>
assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet"))
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan." +
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment