diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index cce1489abd301fafac9af06c4554294fe758b414..424a962b5eb1cd1158291e6928ac8ec18cacd26e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.types._ */ trait RunnableCommand extends LogicalPlan with logical.Command { override def output: Seq[Attribute] = Seq.empty - override def children: Seq[LogicalPlan] = Seq.empty + final override def children: Seq[LogicalPlan] = Seq.empty def run(sparkSession: SparkSession): Seq[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index da3f6c600adef6b4ab06032cefd5217288f389fe..c7e327906174ce705b62b6028f4a8794165d8196 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -113,7 +113,7 @@ case class CreateDataSourceTableAsSelectCommand( query: LogicalPlan) extends RunnableCommand { - override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query) + override protected def innerChildren: Seq[LogicalPlan] = Seq(query) override def run(sparkSession: SparkSession): Seq[Row] = { assert(table.tableType != CatalogTableType.VIEW) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 348530888de3acb849310dda1f7fb60db8f23ed7..5968db84cd608ccdd80f2bb63a4d034f8fd4f322 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider import org.apache.spark.sql.execution.datasources.json.JsonFileFormat @@ -479,13 +480,23 @@ case class DataSource( } } + // SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does + // not need to have the query as child, to avoid to analyze an optimized query, + // because InsertIntoHadoopFsRelationCommand will be optimized first. + val columns = partitionColumns.map { name => + val plan = data.logicalPlan + plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse { + throw new AnalysisException( + s"Unable to resolve ${name} given [${plan.output.map(_.name).mkString(", ")}]") + }.asInstanceOf[Attribute] + } // For partitioned relation r, r.schema's column ordering can be different from the column // ordering of data.logicalPlan (partition columns are all moved after data column). This // will be adjusted within InsertIntoHadoopFsRelation. val plan = InsertIntoHadoopFsRelationCommand( outputPath, - partitionColumns.map(UnresolvedAttribute.quoted), + columns, bucketSpec, format, () => Unit, // No existing table needs to be refreshed. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index a6621054fc74b2ed101b28988403f2cb53fa317c..8286467e96a155e81cbb219f776c0e2753ef1acb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -181,7 +181,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { InsertIntoHadoopFsRelationCommand( outputPath, - t.partitionSchema.fields.map(_.name).map(UnresolvedAttribute(_)), + query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver), t.bucketSpec, t.fileFormat, () => t.refresh(), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index de822180ab5fac78e1a7b0234fdc73e8319f2eba..02ce7fab64729242c58505f4ae67cbafcc1c4bb6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -66,7 +66,7 @@ case class InsertIntoHadoopFsRelationCommand( mode: SaveMode) extends RunnableCommand { - override def children: Seq[LogicalPlan] = query :: Nil + override protected def innerChildren: Seq[LogicalPlan] = query :: Nil override def run(sparkSession: SparkSession): Seq[Row] = { // Most formats don't do well with duplicate columns, so lets not allow that diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 05935cec4b67cab7df65d8e6ada80bfab54587e5..63b0e4588e4a6f8ffe53fd8daa7a8eb708700aed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -449,6 +449,14 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be } } + test("SPARK-17230: write out results of decimal calculation") { + val df = spark.range(99, 101) + .selectExpr("id", "cast(id as long) * cast('1.0' as decimal(38, 18)) as num") + df.write.mode(SaveMode.Overwrite).parquet(dir) + val df2 = spark.read.parquet(dir) + checkAnswer(df2, df) + } + private def testRead( df: => DataFrame, expectedResult: Seq[String], diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 6e6b1c2a2bcf9e84ae153e11b6336d37f4791776..ef5a5a001fb6f7b9efc04e2ada3324b4225a5833 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -42,7 +42,7 @@ case class CreateHiveTableAsSelectCommand( private val tableIdentifier = tableDesc.identifier - override def children: Seq[LogicalPlan] = Seq(query) + override def innerChildren: Seq[LogicalPlan] = Seq(query) override def run(sparkSession: SparkSession): Seq[Row] = { lazy val metastoreRelation: MetastoreRelation = {