Skip to content
Snippets Groups Projects
Commit 57e97fcb authored by Wenchen Fan's avatar Wenchen Fan
Browse files

[SPARK-18029][SQL] PruneFileSourcePartitions should not change the output of LogicalRelation

## What changes were proposed in this pull request?

In `PruneFileSourcePartitions`, we will replace the `LogicalRelation` with a pruned one. However, this replacement may change the output of the `LogicalRelation` if it doesn't have `expectedOutputAttributes`. This PR fixes it.

## How was this patch tested?

the new `PruneFileSourcePartitionsSuite`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #15569 from cloud-fan/partition-bug.
parent 3180272d
No related branches found
No related tags found
No related merge requests found
......@@ -102,8 +102,8 @@ case class CatalogTablePartition(
* Given the partition schema, returns a row with that schema holding the partition values.
*/
def toRow(partitionSchema: StructType): InternalRow = {
InternalRow.fromSeq(partitionSchema.map { case StructField(name, dataType, _, _) =>
Cast(Literal(spec(name)), dataType).eval()
InternalRow.fromSeq(partitionSchema.map { field =>
Cast(Literal(spec(field.name)), field.dataType).eval()
})
}
}
......
......@@ -59,7 +59,9 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
val prunedFileCatalog = tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq)
val prunedFsRelation =
fsRelation.copy(location = prunedFileCatalog)(sparkSession)
val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation)
val prunedLogicalRelation = logicalRelation.copy(
relation = prunedFsRelation,
expectedOutputAttributes = Some(logicalRelation.output))
// Keep partition-pruning predicates so that they are visible in physical planning
val filterExpression = filters.reduceLeft(And)
......
......@@ -20,9 +20,10 @@ package org.apache.spark.sql.hive
import java.io.File
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.QueryTest
class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
test("table name with schema") {
......@@ -78,7 +79,7 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUt
}
test("lazy partition pruning reads only necessary partition data") {
withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> "true") {
withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "true") {
withTable("test") {
withTempDir { dir =>
setupPartitionedTable("test", dir)
......@@ -114,7 +115,7 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUt
}
test("all partitions read and cached when filesource partition pruning is off") {
withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> "false") {
withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "false") {
withTable("test") {
withTempDir { dir =>
setupPartitionedTable("test", dir)
......
......@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
/**
......@@ -62,7 +63,7 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi
def testCaching(pruningEnabled: Boolean): Unit = {
test(s"partitioned table is cached when partition pruning is $pruningEnabled") {
withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> pruningEnabled.toString) {
withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> pruningEnabled.toString) {
withTable("test") {
withTempDir { dir =>
spark.range(5).selectExpr("id", "id as f1", "id as f2").write
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions, TableFileCatalog}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.StructType
class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches = Batch("PruneFileSourcePartitions", Once, PruneFileSourcePartitions) :: Nil
}
test("PruneFileSourcePartitions should not change the output of LogicalRelation") {
withTable("test") {
withTempDir { dir =>
sql(
s"""
|CREATE EXTERNAL TABLE test(i int)
|PARTITIONED BY (p int)
|STORED AS parquet
|LOCATION '${dir.getAbsolutePath}'""".stripMargin)
val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
val tableFileCatalog = new TableFileCatalog(
spark,
tableMeta.database,
tableMeta.identifier.table,
Some(tableMeta.partitionSchema),
0)
val dataSchema = StructType(tableMeta.schema.filterNot { f =>
tableMeta.partitionColumnNames.contains(f.name)
})
val relation = HadoopFsRelation(
location = tableFileCatalog,
partitionSchema = tableMeta.partitionSchema,
dataSchema = dataSchema,
bucketSpec = None,
fileFormat = new ParquetFileFormat(),
options = Map.empty)(sparkSession = spark)
val logicalRelation = LogicalRelation(relation, catalogTable = Some(tableMeta))
val query = Project(Seq('i, 'p), Filter('p === 1, logicalRelation)).analyze
val optimized = Optimize.execute(query)
assert(optimized.missingInput.isEmpty)
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment