Skip to content
Snippets Groups Projects
Commit 23d5f886 authored by Michael Armbrust's avatar Michael Armbrust
Browse files

[SPARK-6851][SQL] Create new instance for each converted parquet relation

Otherwise we end up rewriting predicates to be trivially equal (i.e. `a#1 = a#2` -> `a#3 = a#3`), at which point the query is no longer valid.

Author: Michael Armbrust <michael@databricks.com>

Closes #5458 from marmbrus/selfJoinParquet and squashes the following commits:

22df77c [Michael Armbrust] [SPARK-6851][SQL] Create new instance for each converted parquet relation
parent 68ecdb7f
No related branches found
No related tags found
No related merge requests found
......@@ -279,7 +279,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
}
if (metastoreRelation.hiveQlTable.isPartitioned) {
val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
val partitionColumnDataTypes = partitionSchema.map(_.dataType)
val partitions = metastoreRelation.hiveQlPartitions.map { p =>
......@@ -314,6 +314,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
parquetRelation
}
result.newInstance()
}
override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = synchronized {
......
......@@ -34,6 +34,17 @@ case class Nested3(f3: Int)
case class NestedArray2(b: Seq[Int])
case class NestedArray1(a: NestedArray2)
case class Order(
id: Int,
make: String,
`type`: String,
price: Int,
pdate: String,
customer: String,
city: String,
state: String,
month: Int)
/**
* A collection of hive query tests where we generate the answers ourselves instead of depending on
* Hive to generate them (in contrast to HiveQuerySuite). Often this is because the query is
......@@ -41,6 +52,72 @@ case class NestedArray1(a: NestedArray2)
*/
class SQLQuerySuite extends QueryTest {
test("SPARK-6851: Self-joined converted parquet tables") {
val orders = Seq(
Order(1, "Atlas", "MTB", 234, "2015-01-07", "John D", "Pacifica", "CA", 20151),
Order(3, "Swift", "MTB", 285, "2015-01-17", "John S", "Redwood City", "CA", 20151),
Order(4, "Atlas", "Hybrid", 303, "2015-01-23", "Jones S", "San Mateo", "CA", 20151),
Order(7, "Next", "MTB", 356, "2015-01-04", "Jane D", "Daly City", "CA", 20151),
Order(10, "Next", "YFlikr", 187, "2015-01-09", "John D", "Fremont", "CA", 20151),
Order(11, "Swift", "YFlikr", 187, "2015-01-23", "John D", "Hayward", "CA", 20151),
Order(2, "Next", "Hybrid", 324, "2015-02-03", "Jane D", "Daly City", "CA", 20152),
Order(5, "Next", "Street", 187, "2015-02-08", "John D", "Fremont", "CA", 20152),
Order(6, "Atlas", "Street", 154, "2015-02-09", "John D", "Pacifica", "CA", 20152),
Order(8, "Swift", "Hybrid", 485, "2015-02-19", "John S", "Redwood City", "CA", 20152),
Order(9, "Atlas", "Split", 303, "2015-02-28", "Jones S", "San Mateo", "CA", 20152))
val orderUpdates = Seq(
Order(1, "Atlas", "MTB", 434, "2015-01-07", "John D", "Pacifica", "CA", 20151),
Order(11, "Swift", "YFlikr", 137, "2015-01-23", "John D", "Hayward", "CA", 20151))
orders.toDF.registerTempTable("orders1")
orderUpdates.toDF.registerTempTable("orderupdates1")
sql(
"""CREATE TABLE orders(
| id INT,
| make String,
| type String,
| price INT,
| pdate String,
| customer String,
| city String)
|PARTITIONED BY (state STRING, month INT)
|STORED AS PARQUET
""".stripMargin)
sql(
"""CREATE TABLE orderupdates(
| id INT,
| make String,
| type String,
| price INT,
| pdate String,
| customer String,
| city String)
|PARTITIONED BY (state STRING, month INT)
|STORED AS PARQUET
""".stripMargin)
sql("set hive.exec.dynamic.partition.mode=nonstrict")
sql("INSERT INTO TABLE orders PARTITION(state, month) SELECT * FROM orders1")
sql("INSERT INTO TABLE orderupdates PARTITION(state, month) SELECT * FROM orderupdates1")
checkAnswer(
sql(
"""
|select orders.state, orders.month
|from orders
|join (
| select distinct orders.state,orders.month
| from orders
| join orderupdates
| on orderupdates.id = orders.id) ao
| on ao.state = orders.state and ao.month = orders.month
""".stripMargin),
(1 to 6).map(_ => Row("CA", 20151)))
}
test("SPARK-5371: union with null and sum") {
val df = Seq((1, 1)).toDF("c1", "c2")
df.registerTempTable("table1")
......@@ -478,5 +555,4 @@ class SQLQuerySuite extends QueryTest {
sql("select d from dn union all select d * 2 from dn")
.queryExecution.analyzed
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment