Skip to content
Snippets Groups Projects
Commit 2861b07b authored by Michael Armbrust's avatar Michael Armbrust Committed by Reynold Xin
Browse files

[SQL] SPARK-1354 Fix self-joins of parquet relations

@AndreSchumacher, please take a look.

https://spark-project.atlassian.net/browse/SPARK-1354

Author: Michael Armbrust <michael@databricks.com>

Closes #269 from marmbrus/parquetJoin and squashes the following commits:

4081e77 [Michael Armbrust] Create new instances of Parquet relation when multiple copies are in a single plan.
parent 92b83959
No related branches found
No related tags found
No related merge requests found
......@@ -36,7 +36,7 @@ import parquet.schema.{MessageType, MessageTypeParser}
import parquet.schema.{PrimitiveType => ParquetPrimitiveType}
import parquet.schema.{Type => ParquetType}
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.types._
......@@ -54,7 +54,8 @@ import org.apache.spark.sql.catalyst.types._
* @param tableName The name of the relation that can be used in queries.
* @param path The path to the Parquet file.
*/
case class ParquetRelation(tableName: String, path: String) extends BaseRelation {
case class ParquetRelation(tableName: String, path: String)
extends BaseRelation with MultiInstanceRelation {
/** Schema derived from ParquetFile **/
def parquetSchema: MessageType =
......@@ -74,6 +75,16 @@ case class ParquetRelation(tableName: String, path: String) extends BaseRelation
// Parquet files have no concepts of keys, therefore no Partitioner
// Note: we could allow Block level access; needs to be thought through
override def isPartitioned = false
override def newInstance = ParquetRelation(tableName, path).asInstanceOf[this.type]
// Equals must also take into account the output attributes so that we can distinguish between
// different instances of the same relation,
override def equals(other: Any) = other match {
case p: ParquetRelation =>
p.tableName == tableName && p.path == path && p.output == output
case _ => false
}
}
object ParquetRelation {
......
......@@ -30,6 +30,9 @@ import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.util.getTempFilePath
import org.apache.spark.sql.test.TestSQLContext
// Implicits
import org.apache.spark.sql.test.TestSQLContext._
class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
override def beforeAll() {
ParquetTestData.writeFile()
......@@ -39,6 +42,22 @@ class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
ParquetTestData.testFile.delete()
}
test("self-join parquet files") {
val x = ParquetTestData.testData.subquery('x)
val y = ParquetTestData.testData.subquery('y)
val query = x.join(y).where("x.myint".attr === "y.myint".attr)
// Check to make sure that the attributes from either side of the join have unique expression
// ids.
query.queryExecution.analyzed.output.filter(_.name == "myint") match {
case Seq(i1, i2) if(i1.exprId == i2.exprId) =>
fail(s"Duplicate expression IDs found in query plan: $query")
case Seq(_, _) => // All good
}
// TODO: We can't run this query as it NPEs
}
test("Import of simple Parquet file") {
val result = getRDD(ParquetTestData.testData).collect()
assert(result.size === 15)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment