Skip to content
Snippets Groups Projects
Commit ba19689f authored by Cheng Lian's avatar Cheng Lian Committed by Reynold Xin
Browse files

[SQL] [Minor] Remove deprecated parquet tests

This PR removes the deprecated `ParquetQuerySuite`, renamed `ParquetQuerySuite2` to `ParquetQuerySuite`, and refactored changes introduced in #4115 to `ParquetFilterSuite` . It is a follow-up of #3644.

Notice that test cases in the old `ParquetQuerySuite` have already been well covered by other test suites introduced in #3644.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4116)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>

Closes #4116 from liancheng/remove-deprecated-parquet-tests and squashes the following commits:

f73b8f9 [Cheng Lian] Removes deprecated Parquet test suite
parent b328ac6c
No related branches found
No related tags found
No related merge requests found
......@@ -21,7 +21,7 @@ import parquet.filter2.predicate.Operators._
import parquet.filter2.predicate.{FilterPredicate, Operators}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.{Literal, Predicate, Row}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal, Predicate, Row}
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.{QueryTest, SQLConf, SchemaRDD}
......@@ -40,15 +40,16 @@ import org.apache.spark.sql.{QueryTest, SQLConf, SchemaRDD}
class ParquetFilterSuite extends QueryTest with ParquetTest {
val sqlContext = TestSQLContext
private def checkFilterPushdown(
private def checkFilterPredicate(
rdd: SchemaRDD,
output: Seq[Symbol],
predicate: Predicate,
filterClass: Class[_ <: FilterPredicate],
checker: (SchemaRDD, Any) => Unit,
expectedResult: Any): Unit = {
checker: (SchemaRDD, Seq[Row]) => Unit,
expected: Seq[Row]): Unit = {
val output = predicate.collect { case a: Attribute => a }.distinct
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
val query = rdd.select(output.map(_.attr): _*).where(predicate)
val query = rdd.select(output: _*).where(predicate)
val maybeAnalyzedPredicate = query.queryExecution.executedPlan.collect {
case plan: ParquetTableScan => plan.columnPruningPred
......@@ -58,209 +59,180 @@ class ParquetFilterSuite extends QueryTest with ParquetTest {
maybeAnalyzedPredicate.foreach { pred =>
val maybeFilter = ParquetFilters.createFilter(pred)
assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
maybeFilter.foreach(f => assert(f.getClass === filterClass))
maybeFilter.foreach { f =>
// Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
assert(f.getClass === filterClass)
}
}
checker(query, expectedResult)
checker(query, expected)
}
}
private def checkFilterPushdown1
(rdd: SchemaRDD, output: Symbol*)
(predicate: Predicate, filterClass: Class[_ <: FilterPredicate])
(expectedResult: => Seq[Row]): Unit = {
checkFilterPushdown(rdd, output, predicate, filterClass,
(query, expected) => checkAnswer(query, expected.asInstanceOf[Seq[Row]]), expectedResult)
}
private def checkFilterPushdown
(rdd: SchemaRDD, output: Symbol*)
(predicate: Predicate, filterClass: Class[_ <: FilterPredicate])
(expectedResult: Int): Unit = {
checkFilterPushdown(rdd, output, predicate, filterClass,
(query, expected) => checkAnswer(query, expected.asInstanceOf[Seq[Row]]), Seq(Row(expectedResult)))
private def checkFilterPredicate
(predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Seq[Row])
(implicit rdd: SchemaRDD): Unit = {
checkFilterPredicate(rdd, predicate, filterClass, checkAnswer(_, _: Seq[Row]), expected)
}
def checkBinaryFilterPushdown
(rdd: SchemaRDD, output: Symbol*)
(predicate: Predicate, filterClass: Class[_ <: FilterPredicate])
(expectedResult: => Any): Unit = {
def checkBinaryAnswer(rdd: SchemaRDD, result: Any): Unit = {
val actual = rdd.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq
val expected = result match {
case s: Seq[_] => s.map(_.asInstanceOf[Row].getAs[Array[Byte]](0).mkString(","))
case s => Seq(s.asInstanceOf[Array[Byte]].mkString(","))
}
assert(actual.sorted === expected.sorted)
}
checkFilterPushdown(rdd, output, predicate, filterClass, checkBinaryAnswer _, expectedResult)
private def checkFilterPredicate[T]
(predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: T)
(implicit rdd: SchemaRDD): Unit = {
checkFilterPredicate(predicate, filterClass, Seq(Row(expected)))(rdd)
}
test("filter pushdown - boolean") {
withParquetRDD((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { rdd =>
checkFilterPushdown1(rdd, '_1)('_1.isNull, classOf[Eq[java.lang.Boolean]])(Seq.empty[Row])
checkFilterPushdown1(rdd, '_1)('_1.isNotNull, classOf[NotEq[java.lang.Boolean]]) {
Seq(Row(true), Row(false))
}
withParquetRDD((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit rdd =>
checkFilterPredicate('_1.isNull, classOf[Eq [_]], Seq.empty[Row])
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], Seq(Row(true), Row(false)))
checkFilterPushdown1(rdd, '_1)('_1 === true, classOf[Eq[java.lang.Boolean]])(Seq(Row(true)))
checkFilterPushdown1(rdd, '_1)('_1 !== true, classOf[Operators.NotEq[java.lang.Boolean]])(Seq(Row(false)))
checkFilterPredicate('_1 === true, classOf[Eq [_]], true)
checkFilterPredicate('_1 !== true, classOf[NotEq[_]], false)
}
}
test("filter pushdown - integer") {
withParquetRDD((1 to 4).map(i => Tuple1(Option(i)))) { rdd =>
checkFilterPushdown1(rdd, '_1)('_1.isNull, classOf[Eq[Integer]])(Seq.empty[Row])
checkFilterPushdown1(rdd, '_1)('_1.isNotNull, classOf[NotEq[Integer]]) {
(1 to 4).map(Row.apply(_))
}
checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[Integer]])(1)
checkFilterPushdown1(rdd, '_1)('_1 !== 1, classOf[Operators.NotEq[Integer]]) {
(2 to 4).map(Row.apply(_))
}
checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [Integer]])(1)
checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [Integer]])(4)
checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[Integer]])(1)
checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[Integer]])(4)
checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq [Integer]])(1)
checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [Integer]])(1)
checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [Integer]])(4)
checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[Integer]])(1)
checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[Integer]])(4)
checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[Integer]])(4)
checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
checkFilterPushdown1(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
Seq(Row(1), Row(4))
}
withParquetRDD((1 to 4).map(i => Tuple1(Option(i)))) { implicit rdd =>
checkFilterPredicate('_1.isNull, classOf[Eq [_]], Seq.empty[Row])
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
checkFilterPredicate('_1 === 1, classOf[Eq [_]], 1)
checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
checkFilterPredicate('_1 < 2, classOf[Lt [_]], 1)
checkFilterPredicate('_1 > 3, classOf[Gt [_]], 4)
checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
checkFilterPredicate(Literal(1) === '_1, classOf[Eq [_]], 1)
checkFilterPredicate(Literal(2) > '_1, classOf[Lt [_]], 1)
checkFilterPredicate(Literal(3) < '_1, classOf[Gt [_]], 4)
checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
}
}
test("filter pushdown - long") {
withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toLong)))) { rdd =>
checkFilterPushdown1(rdd, '_1)('_1.isNull, classOf[Eq[java.lang.Long]])(Seq.empty[Row])
checkFilterPushdown1(rdd, '_1)('_1.isNotNull, classOf[NotEq[java.lang.Long]]) {
(1 to 4).map(Row.apply(_))
}
checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Long]])(1)
checkFilterPushdown1(rdd, '_1)('_1 !== 1, classOf[Operators.NotEq[java.lang.Long]]) {
(2 to 4).map(Row.apply(_))
}
checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [java.lang.Long]])(1)
checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [java.lang.Long]])(4)
checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Long]])(1)
checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Long]])(4)
checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq [Integer]])(1)
checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [java.lang.Long]])(1)
checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [java.lang.Long]])(4)
checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[java.lang.Long]])(1)
checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[java.lang.Long]])(4)
checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Long]])(4)
checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
checkFilterPushdown1(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
Seq(Row(1), Row(4))
}
withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toLong)))) { implicit rdd =>
checkFilterPredicate('_1.isNull, classOf[Eq [_]], Seq.empty[Row])
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
checkFilterPredicate('_1 < 2, classOf[Lt [_]], 1)
checkFilterPredicate('_1 > 3, classOf[Gt [_]], 4)
checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
checkFilterPredicate(Literal(1) === '_1, classOf[Eq [_]], 1)
checkFilterPredicate(Literal(2) > '_1, classOf[Lt [_]], 1)
checkFilterPredicate(Literal(3) < '_1, classOf[Gt [_]], 4)
checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
}
}
test("filter pushdown - float") {
withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { rdd =>
checkFilterPushdown1(rdd, '_1)('_1.isNull, classOf[Eq[java.lang.Float]])(Seq.empty[Row])
checkFilterPushdown1(rdd, '_1)('_1.isNotNull, classOf[NotEq[java.lang.Float]]) {
(1 to 4).map(Row.apply(_))
}
checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Float]])(1)
checkFilterPushdown1(rdd, '_1)('_1 !== 1, classOf[Operators.NotEq[java.lang.Float]]) {
(2 to 4).map(Row.apply(_))
}
checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [java.lang.Float]])(1)
checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [java.lang.Float]])(4)
checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Float]])(1)
checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Float]])(4)
checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq [Integer]])(1)
checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [java.lang.Float]])(1)
checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [java.lang.Float]])(4)
checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[java.lang.Float]])(1)
checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[java.lang.Float]])(4)
checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Float]])(4)
checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
checkFilterPushdown1(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
Seq(Row(1), Row(4))
}
withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { implicit rdd =>
checkFilterPredicate('_1.isNull, classOf[Eq [_]], Seq.empty[Row])
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
checkFilterPredicate('_1 === 1, classOf[Eq [_]], 1)
checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
checkFilterPredicate('_1 < 2, classOf[Lt [_]], 1)
checkFilterPredicate('_1 > 3, classOf[Gt [_]], 4)
checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
checkFilterPredicate(Literal(1) === '_1, classOf[Eq [_]], 1)
checkFilterPredicate(Literal(2) > '_1, classOf[Lt [_]], 1)
checkFilterPredicate(Literal(3) < '_1, classOf[Gt [_]], 4)
checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
}
}
test("filter pushdown - double") {
withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { rdd =>
checkFilterPushdown1(rdd, '_1)('_1.isNull, classOf[Eq[java.lang.Double]])(Seq.empty[Row])
checkFilterPushdown1(rdd, '_1)('_1.isNotNull, classOf[NotEq[java.lang.Double]]) {
(1 to 4).map(Row.apply(_))
}
checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Double]])(1)
checkFilterPushdown1(rdd, '_1)('_1 !== 1, classOf[Operators.NotEq[java.lang.Double]]) {
(2 to 4).map(Row.apply(_))
}
checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [java.lang.Double]])(1)
checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [java.lang.Double]])(4)
checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Double]])(1)
checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Double]])(4)
checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq[Integer]])(1)
checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [java.lang.Double]])(1)
checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [java.lang.Double]])(4)
checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[java.lang.Double]])(1)
checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[java.lang.Double]])(4)
checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Double]])(4)
checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
checkFilterPushdown1(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
Seq(Row(1), Row(4))
}
withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { implicit rdd =>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
checkFilterPredicate('_1 === 1, classOf[Eq [_]], 1)
checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
checkFilterPredicate('_1 < 2, classOf[Lt [_]], 1)
checkFilterPredicate('_1 > 3, classOf[Gt [_]], 4)
checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
checkFilterPredicate(Literal(1) === '_1, classOf[Eq [_]], 1)
checkFilterPredicate(Literal(2) > '_1, classOf[Lt [_]], 1)
checkFilterPredicate(Literal(3) < '_1, classOf[Gt [_]], 4)
checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
}
}
test("filter pushdown - string") {
withParquetRDD((1 to 4).map(i => Tuple1(i.toString))) { rdd =>
checkFilterPushdown1(rdd, '_1)('_1.isNull, classOf[Eq[java.lang.String]])(Seq.empty[Row])
checkFilterPushdown1(rdd, '_1)('_1.isNotNull, classOf[NotEq[java.lang.String]]) {
(1 to 4).map(i => Row.apply(i.toString))
}
checkFilterPushdown1(rdd, '_1)('_1 === "1", classOf[Eq[String]])(Seq(Row("1")))
checkFilterPushdown1(rdd, '_1)('_1 !== "1", classOf[Operators.NotEq[String]]) {
(2 to 4).map(i => Row.apply(i.toString))
}
withParquetRDD((1 to 4).map(i => Tuple1(i.toString))) { implicit rdd =>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate(
'_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => Row.apply(i.toString)))
checkFilterPredicate('_1 === "1", classOf[Eq [_]], "1")
checkFilterPredicate('_1 !== "1", classOf[NotEq[_]], (2 to 4).map(i => Row.apply(i.toString)))
checkFilterPredicate('_1 < "2", classOf[Lt [_]], "1")
checkFilterPredicate('_1 > "3", classOf[Gt [_]], "4")
checkFilterPredicate('_1 <= "1", classOf[LtEq[_]], "1")
checkFilterPredicate('_1 >= "4", classOf[GtEq[_]], "4")
checkFilterPredicate(Literal("1") === '_1, classOf[Eq [_]], "1")
checkFilterPredicate(Literal("2") > '_1, classOf[Lt [_]], "1")
checkFilterPredicate(Literal("3") < '_1, classOf[Gt [_]], "4")
checkFilterPredicate(Literal("1") >= '_1, classOf[LtEq[_]], "1")
checkFilterPredicate(Literal("4") <= '_1, classOf[GtEq[_]], "4")
checkFilterPredicate(!('_1 < "4"), classOf[GtEq[_]], "4")
checkFilterPredicate('_1 > "2" && '_1 < "4", classOf[Operators.And], "3")
checkFilterPredicate('_1 < "2" || '_1 > "3", classOf[Operators.Or], Seq(Row("1"), Row("4")))
}
}
checkFilterPushdown1(rdd, '_1)('_1 < "2", classOf[Lt [java.lang.String]])(Seq(Row("1")))
checkFilterPushdown1(rdd, '_1)('_1 > "3", classOf[Gt [java.lang.String]])(Seq(Row("4")))
checkFilterPushdown1(rdd, '_1)('_1 <= "1", classOf[LtEq[java.lang.String]])(Seq(Row("1")))
checkFilterPushdown1(rdd, '_1)('_1 >= "4", classOf[GtEq[java.lang.String]])(Seq(Row("4")))
checkFilterPushdown1(rdd, '_1)(Literal("1") === '_1, classOf[Eq [java.lang.String]])(Seq(Row("1")))
checkFilterPushdown1(rdd, '_1)(Literal("2") > '_1, classOf[Lt [java.lang.String]])(Seq(Row("1")))
checkFilterPushdown1(rdd, '_1)(Literal("3") < '_1, classOf[Gt [java.lang.String]])(Seq(Row("4")))
checkFilterPushdown1(rdd, '_1)(Literal("1") >= '_1, classOf[LtEq[java.lang.String]])(Seq(Row("1")))
checkFilterPushdown1(rdd, '_1)(Literal("4") <= '_1, classOf[GtEq[java.lang.String]])(Seq(Row("4")))
checkFilterPushdown1(rdd, '_1)(!('_1 < "4"), classOf[Operators.GtEq[java.lang.String]])(Seq(Row("4")))
checkFilterPushdown1(rdd, '_1)('_1 > "2" && '_1 < "4", classOf[Operators.And])(Seq(Row("3")))
checkFilterPushdown1(rdd, '_1)('_1 < "2" || '_1 > "3", classOf[Operators.Or]) {
Seq(Row("1"), Row("4"))
def checkBinaryFilterPredicate
(predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Seq[Row])
(implicit rdd: SchemaRDD): Unit = {
def checkBinaryAnswer(rdd: SchemaRDD, expected: Seq[Row]) = {
assertResult(expected.map(_.getAs[Array[Byte]](0).mkString(",")).toSeq.sorted) {
rdd.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq.sorted
}
}
checkFilterPredicate(rdd, predicate, filterClass, checkBinaryAnswer _, expected)
}
def checkBinaryFilterPredicate
(predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Array[Byte])
(implicit rdd: SchemaRDD): Unit = {
checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(rdd)
}
test("filter pushdown - binary") {
......@@ -268,33 +240,30 @@ class ParquetFilterSuite extends QueryTest with ParquetTest {
def b: Array[Byte] = int.toString.getBytes("UTF-8")
}
withParquetRDD((1 to 4).map(i => Tuple1(i.b))) { rdd =>
checkBinaryFilterPushdown(rdd, '_1)('_1.isNull, classOf[Eq[java.lang.String]])(Seq.empty[Row])
checkBinaryFilterPushdown(rdd, '_1)('_1.isNotNull, classOf[NotEq[java.lang.String]]) {
(1 to 4).map(i => Row.apply(i.b)).toSeq
}
checkBinaryFilterPushdown(rdd, '_1)('_1 === 1.b, classOf[Eq[Array[Byte]]])(1.b)
checkBinaryFilterPushdown(rdd, '_1)('_1 !== 1.b, classOf[Operators.NotEq[Array[Byte]]]) {
(2 to 4).map(i => Row.apply(i.b)).toSeq
}
checkBinaryFilterPushdown(rdd, '_1)('_1 < 2.b, classOf[Lt [Array[Byte]]])(1.b)
checkBinaryFilterPushdown(rdd, '_1)('_1 > 3.b, classOf[Gt [Array[Byte]]])(4.b)
checkBinaryFilterPushdown(rdd, '_1)('_1 <= 1.b, classOf[LtEq[Array[Byte]]])(1.b)
checkBinaryFilterPushdown(rdd, '_1)('_1 >= 4.b, classOf[GtEq[Array[Byte]]])(4.b)
checkBinaryFilterPushdown(rdd, '_1)(Literal(1.b) === '_1, classOf[Eq [Array[Byte]]])(1.b)
checkBinaryFilterPushdown(rdd, '_1)(Literal(2.b) > '_1, classOf[Lt [Array[Byte]]])(1.b)
checkBinaryFilterPushdown(rdd, '_1)(Literal(3.b) < '_1, classOf[Gt [Array[Byte]]])(4.b)
checkBinaryFilterPushdown(rdd, '_1)(Literal(1.b) >= '_1, classOf[LtEq[Array[Byte]]])(1.b)
checkBinaryFilterPushdown(rdd, '_1)(Literal(4.b) <= '_1, classOf[GtEq[Array[Byte]]])(4.b)
checkBinaryFilterPushdown(rdd, '_1)(!('_1 < 4.b), classOf[Operators.GtEq[Array[Byte]]])(4.b)
checkBinaryFilterPushdown(rdd, '_1)('_1 > 2.b && '_1 < 4.b, classOf[Operators.And])(3.b)
checkBinaryFilterPushdown(rdd, '_1)('_1 < 2.b || '_1 > 3.b, classOf[Operators.Or]) {
Seq(Row(1.b), Row(4.b))
}
withParquetRDD((1 to 4).map(i => Tuple1(i.b))) { implicit rdd =>
checkBinaryFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
checkBinaryFilterPredicate(
'_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => Row.apply(i.b)).toSeq)
checkBinaryFilterPredicate('_1 === 1.b, classOf[Eq [_]], 1.b)
checkBinaryFilterPredicate(
'_1 !== 1.b, classOf[NotEq[_]], (2 to 4).map(i => Row.apply(i.b)).toSeq)
checkBinaryFilterPredicate('_1 < 2.b, classOf[Lt [_]], 1.b)
checkBinaryFilterPredicate('_1 > 3.b, classOf[Gt [_]], 4.b)
checkBinaryFilterPredicate('_1 <= 1.b, classOf[LtEq[_]], 1.b)
checkBinaryFilterPredicate('_1 >= 4.b, classOf[GtEq[_]], 4.b)
checkBinaryFilterPredicate(Literal(1.b) === '_1, classOf[Eq [_]], 1.b)
checkBinaryFilterPredicate(Literal(2.b) > '_1, classOf[Lt [_]], 1.b)
checkBinaryFilterPredicate(Literal(3.b) < '_1, classOf[Gt [_]], 4.b)
checkBinaryFilterPredicate(Literal(1.b) >= '_1, classOf[LtEq[_]], 1.b)
checkBinaryFilterPredicate(Literal(4.b) <= '_1, classOf[GtEq[_]], 4.b)
checkBinaryFilterPredicate(!('_1 < 4.b), classOf[GtEq[_]], 4.b)
checkBinaryFilterPredicate('_1 > 2.b && '_1 < 4.b, classOf[Operators.And], 3.b)
checkBinaryFilterPredicate(
'_1 < 2.b || '_1 > 3.b, classOf[Operators.Or], Seq(Row(1.b), Row(4.b)))
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.parquet
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._
/**
* A test suite that tests various Parquet queries.
*/
class ParquetQuerySuite2 extends QueryTest with ParquetTest {
val sqlContext = TestSQLContext
test("simple projection") {
withParquetTable((0 until 10).map(i => (i, i.toString)), "t") {
checkAnswer(sql("SELECT _1 FROM t"), (0 until 10).map(Row.apply(_)))
}
}
test("appending") {
val data = (0 until 10).map(i => (i, i.toString))
withParquetTable(data, "t") {
sql("INSERT INTO t SELECT * FROM t")
checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
}
}
test("self-join") {
// 4 rows, cells of column 1 of row 2 and row 4 are null
val data = (1 to 4).map { i =>
val maybeInt = if (i % 2 == 0) None else Some(i)
(maybeInt, i.toString)
}
withParquetTable(data, "t") {
val selfJoin = sql("SELECT * FROM t x JOIN t y WHERE x._1 = y._1")
val queryOutput = selfJoin.queryExecution.analyzed.output
assertResult(4, s"Field count mismatches")(queryOutput.size)
assertResult(2, s"Duplicated expression ID in query plan:\n $selfJoin") {
queryOutput.filter(_.name == "_1").map(_.exprId).size
}
checkAnswer(selfJoin, List(Row(1, "1", 1, "1"), Row(3, "3", 3, "3")))
}
}
test("nested data - struct with array field") {
val data = (1 to 10).map(i => Tuple1((i, Seq(s"val_$i"))))
withParquetTable(data, "t") {
checkAnswer(sql("SELECT _1._2[0] FROM t"), data.map {
case Tuple1((_, Seq(string))) => Row(string)
})
}
}
test("nested data - array of struct") {
val data = (1 to 10).map(i => Tuple1(Seq(i -> s"val_$i")))
withParquetTable(data, "t") {
checkAnswer(sql("SELECT _1[0]._2 FROM t"), data.map {
case Tuple1(Seq((_, string))) => Row(string)
})
}
}
test("SPARK-1913 regression: columns only referenced by pushed down filters should remain") {
withParquetTable((1 to 10).map(Tuple1.apply), "t") {
checkAnswer(sql(s"SELECT _1 FROM t WHERE _1 < 10"), (1 to 9).map(Row.apply(_)))
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment