diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index fc76e7661767040e09b19b0fcdecaae3e5497215..161d28eba070ee7ef93d2d41dc25f423094ec9db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -55,7 +55,9 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool StarExpansion :: ResolveFunctions :: GlobalAggregates :: - typeCoercionRules :_*) + typeCoercionRules :_*), + Batch("AnalysisOperators", fixedPoint, + EliminateAnalysisOperators) ) /** @@ -80,6 +82,7 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool case s: Star => s.copy(table = s.table.map(_.toLowerCase)) case UnresolvedAttribute(name) => UnresolvedAttribute(name.toLowerCase) case Alias(c, name) => Alias(c, name.toLowerCase)() + case GetField(c, name) => GetField(c, name.toLowerCase) } } } @@ -184,3 +187,17 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool exprs.collect { case _: Star => true }.nonEmpty } } + +/** + * Removes [[catalyst.plans.logical.Subquery Subquery]] operators from the plan. Subqueries are + * only required to provide scoping information for attributes and can be removed once analysis is + * complete. Similarly, this node also removes + * [[catalyst.plans.logical.LowerCaseSchema LowerCaseSchema]] operators. + */ +object EliminateAnalysisOperators extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case Subquery(_, child) => child + case LowerCaseSchema(child) => child + } +} + diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index c1201971d9c0aaf1f372bdf06b548b5620dfdafe..07ebbc90fce71c40243fd03c99535512896c2a6e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -27,30 +27,16 @@ import org.apache.spark.sql.catalyst.types._ object Optimizer extends RuleExecutor[LogicalPlan] { val batches = - Batch("Subqueries", Once, - EliminateSubqueries) :: Batch("ConstantFolding", Once, ConstantFolding, BooleanSimplification, SimplifyCasts) :: Batch("Filter Pushdown", Once, - EliminateSubqueries, CombineFilters, PushPredicateThroughProject, PushPredicateThroughInnerJoin) :: Nil } -/** - * Removes [[catalyst.plans.logical.Subquery Subquery]] operators from the plan. Subqueries are - * only required to provide scoping information for attributes and can be removed once analysis is - * complete. - */ -object EliminateSubqueries extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case Subquery(_, child) => child - } -} - /** * Replaces [[catalyst.expressions.Expression Expressions]] that can be statically evaluated with * equivalent [[catalyst.expressions.Literal Literal]] values. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 6480cca30049e79b29f6acc4daddd138f8ef6593..61481de65e76eca1feef2fa0b38b67e67bfde513 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -21,6 +21,7 @@ package plans package logical import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode { def output = projectList.map(_.toAttribute) @@ -86,7 +87,7 @@ case class Join( } case class InsertIntoTable( - table: BaseRelation, + table: LogicalPlan, partition: Map[String, Option[String]], child: LogicalPlan, overwrite: Boolean) @@ -141,6 +142,33 @@ case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode { def references = Set.empty } +/** + * Converts the schema of `child` to all lowercase, together with LowercaseAttributeReferences + * this allows for optional case insensitive attribute resolution. This node can be elided after + * analysis. + */ +case class LowerCaseSchema(child: LogicalPlan) extends UnaryNode { + protected def lowerCaseSchema(dataType: DataType): DataType = dataType match { + case StructType(fields) => + StructType(fields.map(f => + StructField(f.name.toLowerCase(), lowerCaseSchema(f.dataType), f.nullable))) + case ArrayType(elemType) => ArrayType(lowerCaseSchema(elemType)) + case otherType => otherType + } + + val output = child.output.map { + case a: AttributeReference => + AttributeReference( + a.name.toLowerCase, + lowerCaseSchema(a.dataType), + a.nullable)( + a.exprId, + a.qualifiers) + } + + def references = Set.empty +} + case class Sample(fraction: Double, withReplacement: Boolean, seed: Int, child: LogicalPlan) extends UnaryNode { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala index 2c107b865af193a438936d50a4073a3171b974e8..53f760fb4ceb223b111af24c5a779d1de5a3a3f6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala @@ -19,21 +19,22 @@ package org.apache.spark.sql package catalyst package optimizer -import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.types.IntegerType // For implicit conversions +import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.dsl.expressions._ class ConstantFoldingSuite extends OptimizerTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = - Batch("Subqueries", Once, - EliminateSubqueries) :: + Batch("AnalysisNodes", Once, + EliminateAnalysisOperators) :: Batch("ConstantFolding", Once, ConstantFolding, BooleanSimplification) :: Nil diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 0aa7e4d04d7078fa8f9f2e982d66ad2134d5fb60..ae1b2b13dd8f17febc60b68f41d9835a60e208c0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql package catalyst package optimizer + +import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -31,9 +33,8 @@ class FilterPushdownSuite extends OptimizerTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("Subqueries", Once, - EliminateSubqueries) :: + EliminateAnalysisOperators) :: Batch("Filter Pushdown", Once, - EliminateSubqueries, CombineFilters, PushPredicateThroughProject, PushPredicateThroughInnerJoin) :: Nil @@ -172,7 +173,7 @@ class FilterPushdownSuite extends OptimizerTest { } val optimized = Optimize(originalQuery.analyze) - comparePlans(optimizer.EliminateSubqueries(originalQuery.analyze), optimized) + comparePlans(analysis.EliminateAnalysisOperators(originalQuery.analyze), optimized) } test("joins: conjunctive predicates") { @@ -191,7 +192,7 @@ class FilterPushdownSuite extends OptimizerTest { left.join(right, condition = Some("x.b".attr === "y.b".attr)) .analyze - comparePlans(optimized, optimizer.EliminateSubqueries(correctAnswer)) + comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer)) } test("joins: conjunctive predicates #2") { @@ -210,7 +211,7 @@ class FilterPushdownSuite extends OptimizerTest { left.join(right, condition = Some("x.b".attr === "y.b".attr)) .analyze - comparePlans(optimized, optimizer.EliminateSubqueries(correctAnswer)) + comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer)) } test("joins: conjunctive predicates #3") { @@ -233,6 +234,6 @@ class FilterPushdownSuite extends OptimizerTest { condition = Some("z.a".attr === "x.b".attr)) .analyze - comparePlans(optimized, optimizer.EliminateSubqueries(correctAnswer)) + comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 491b3a62712c6710b02a1228d1881ada3f8518f4..af35c919df308245c0beb6ccd1289d3947600927 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -32,7 +32,7 @@ import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog} import org.apache.spark.sql.catalyst.expressions.GenericRow -import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan, LowerCaseSchema} import org.apache.spark.sql.catalyst.plans.logical.{NativeCommand, ExplainCommand} import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.execution._ @@ -108,18 +108,20 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { /* A catalyst metadata catalog that points to the Hive Metastore. */ @transient - override lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog + override lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog { + override def lookupRelation( + databaseName: Option[String], + tableName: String, + alias: Option[String] = None): LogicalPlan = { + + LowerCaseSchema(super.lookupRelation(databaseName, tableName, alias)) + } + } /* An analyzer that uses the Hive metastore. */ @transient override lazy val analyzer = new Analyzer(catalog, HiveFunctionRegistry, caseSensitive = false) - def tables: Seq[BaseRelation] = { - // TODO: Move this functionallity to Catalog. Make client protected. - val allTables = catalog.client.getAllTables("default") - allTables.map(catalog.lookupRelation(None, _, None)).collect { case b: BaseRelation => b } - } - /** * Runs the specified SQL query using Hive. */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index a5db283765c2714468771cf594e623ecbda3a111..1667a217297b17de172391c59c8eee20ec337932 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -27,7 +27,8 @@ import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde2.Deserializer -import org.apache.spark.sql.catalyst.analysis.Catalog + +import org.apache.spark.sql.catalyst.analysis.{Catalog, EliminateAnalysisOperators} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ @@ -96,7 +97,8 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging { createTable(databaseName, tableName, child.output) InsertIntoTable( - lookupRelation(Some(databaseName), tableName, None).asInstanceOf[BaseRelation], + EliminateAnalysisOperators( + lookupRelation(Some(databaseName), tableName, None)), Map.empty, child, overwrite = false) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index c71141c4194f6c9f10bd4d6bb66e627eb6c6b2e3..3dd0530225be38e17bea0984a97f3119e48079f4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -73,11 +73,11 @@ trait HiveStrategies { case p @ FilteredOperation(predicates, relation: MetastoreRelation) if relation.isPartitioned => - val partitionKeyIds = relation.partitionKeys.map(_.id).toSet + val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet // Filter out all predicates that only deal with partition keys val (pruningPredicates, otherPredicates) = predicates.partition { - _.references.map(_.id).subsetOf(partitionKeyIds) + _.references.map(_.exprId).subsetOf(partitionKeyIds) } val scan = HiveTableScan( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala index 996bd4efecd4c09b00ee60d47a8657ebfe7ade6a..4bdea214677ad41f0ea16b564335ac90399f8c60 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala @@ -19,6 +19,11 @@ package org.apache.spark.sql package hive package execution +import TestHive._ + +case class Data(a: Int, B: Int, n: Nested) +case class Nested(a: Int, B: Int) + /** * A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution. */ @@ -47,6 +52,14 @@ class HiveResolutionSuite extends HiveComparisonTest { createQueryTest("alias.*", "SELECT a.* FROM src a ORDER BY key LIMIT 1") + test("case insensitivity with scala reflection") { + // Test resolution with Scala Reflection + TestHive.sparkContext.parallelize(Data(1, 2, Nested(1,2)) :: Nil) + .registerAsTable("caseSensitivityTest") + + sql("SELECT a, b, A, B, n.a, n.b, n.A, n.B FROM caseSensitivityTest") + } + /** * Negative examples. Currently only left here for documentation purposes. * TODO(marmbrus): Test that catalyst fails on these queries.