From 3abd0c1cda09bb575adc99847a619bc84af37fd0 Mon Sep 17 00:00:00 2001
From: Michael Armbrust <michael@databricks.com>
Date: Mon, 18 Aug 2014 13:17:10 -0700
Subject: [PATCH] [SPARK-2406][SQL] Initial support for using ParquetTableScan
 to read HiveMetaStore tables.

This PR adds an experimental flag `spark.sql.hive.convertMetastoreParquet` that when true causes the planner to detects tables that use Hive's Parquet SerDe and instead plans them using Spark SQL's native `ParquetTableScan`.

Author: Michael Armbrust <michael@databricks.com>
Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #1819 from marmbrus/parquetMetastore and squashes the following commits:

1620079 [Michael Armbrust] Revert "remove hive parquet bundle"
cc30430 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into parquetMetastore
4f3d54f [Michael Armbrust] fix style
41ebc5f [Michael Armbrust] remove hive parquet bundle
a43e0da [Michael Armbrust] Merge remote-tracking branch 'origin/master' into parquetMetastore
4c4dc19 [Michael Armbrust] Fix bug with tree splicing.
ebb267e [Michael Armbrust] include parquet hive to tests pass (Remove this later).
c0d9b72 [Michael Armbrust] Avoid creating a HadoopRDD per partition.  Add dirty hacks to retrieve partition values from the InputSplit.
8cdc93c [Michael Armbrust] Merge pull request #8 from yhuai/parquetMetastore
a0baec7 [Yin Huai] Partitioning columns can be resolved.
1161338 [Michael Armbrust] Add a test to make sure conversion is actually happening
212d5cd [Michael Armbrust] Initial support for using ParquetTableScan to read HiveMetaStore tables.
---
 project/SparkBuild.scala                      |   1 -
 .../spark/sql/execution/basicOperators.scala  |  12 ++
 .../spark/sql/parquet/ParquetRelation.scala   |   8 +-
 .../sql/parquet/ParquetTableOperations.scala  |  74 ++++++--
 .../apache/spark/sql/hive/HiveContext.scala   |   9 +
 .../spark/sql/hive/HiveStrategies.scala       | 119 +++++++++++-
 .../sql/hive/parquet/FakeParquetSerDe.scala   |  56 ++++++
 .../sql/parquet/ParquetMetastoreSuite.scala   | 171 ++++++++++++++++++
 8 files changed, 427 insertions(+), 23 deletions(-)
 create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala
 create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala

diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 63a285b81a..49d52aefca 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -228,7 +228,6 @@ object SQL {
 object Hive {
 
   lazy val settings = Seq(
-
     javaOptions += "-XX:MaxPermSize=1g",
     // Multiple queries rely on the TestHive singleton. See comments there for more details.
     parallelExecution in Test := false,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 0027f3cf1f..f9dfa3c92f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -303,3 +303,15 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
     left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
   }
 }
+
+/**
+ * :: DeveloperApi ::
+ * A plan node that does nothing but lie about the output of its child.  Used to spice a
+ * (hopefully structurally equivalent) tree from a different optimization sequence into an already
+ * resolved tree.
+ */
+@DeveloperApi
+case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan {
+  def children = child :: Nil
+  def execute() = child.execute()
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 053b2a1543..1713ae6fb5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -47,7 +47,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode}
 private[sql] case class ParquetRelation(
     path: String,
     @transient conf: Option[Configuration],
-    @transient sqlContext: SQLContext)
+    @transient sqlContext: SQLContext,
+    partitioningAttributes: Seq[Attribute] = Nil)
   extends LeafNode with MultiInstanceRelation {
 
   self: Product =>
@@ -61,12 +62,13 @@ private[sql] case class ParquetRelation(
 
   /** Attributes */
   override val output =
+    partitioningAttributes ++
     ParquetTypesConverter.readSchemaFromFile(
-      new Path(path),
+      new Path(path.split(",").head),
       conf,
       sqlContext.isParquetBinaryAsString)
 
-  override def newInstance = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]
+  override def newInstance() = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]
 
   // Equals must also take into account the output attributes so that we can distinguish between
   // different instances of the same relation,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index c6dca10f6a..f6cfab736d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+
 import parquet.hadoop._
 import parquet.hadoop.api.{InitContext, ReadSupport}
 import parquet.hadoop.metadata.GlobalMetaData
@@ -42,6 +43,7 @@ import parquet.io.ParquetDecodingException
 import parquet.schema.MessageType
 
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.SQLConf
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
 import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
@@ -60,11 +62,18 @@ case class ParquetTableScan(
   // The resolution of Parquet attributes is case sensitive, so we resolve the original attributes
   // by exprId. note: output cannot be transient, see
   // https://issues.apache.org/jira/browse/SPARK-1367
-  val output = attributes.map { a =>
-    relation.output
-      .find(o => o.exprId == a.exprId)
-      .getOrElse(sys.error(s"Invalid parquet attribute $a in ${relation.output.mkString(",")}"))
-  }
+  val normalOutput =
+    attributes
+      .filterNot(a => relation.partitioningAttributes.map(_.exprId).contains(a.exprId))
+      .flatMap(a => relation.output.find(o => o.exprId == a.exprId))
+
+  val partOutput =
+    attributes.flatMap(a => relation.partitioningAttributes.find(o => o.exprId == a.exprId))
+
+  def output = partOutput ++ normalOutput
+
+  assert(normalOutput.size + partOutput.size == attributes.size,
+    s"$normalOutput + $partOutput != $attributes, ${relation.output}")
 
   override def execute(): RDD[Row] = {
     val sc = sqlContext.sparkContext
@@ -72,16 +81,19 @@ case class ParquetTableScan(
     ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
 
     val conf: Configuration = ContextUtil.getConfiguration(job)
-    val qualifiedPath = {
-      val path = new Path(relation.path)
-      path.getFileSystem(conf).makeQualified(path)
+
+    relation.path.split(",").foreach { curPath =>
+      val qualifiedPath = {
+        val path = new Path(curPath)
+        path.getFileSystem(conf).makeQualified(path)
+      }
+      NewFileInputFormat.addInputPath(job, qualifiedPath)
     }
-    NewFileInputFormat.addInputPath(job, qualifiedPath)
 
     // Store both requested and original schema in `Configuration`
     conf.set(
       RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
-      ParquetTypesConverter.convertToString(output))
+      ParquetTypesConverter.convertToString(normalOutput))
     conf.set(
       RowWriteSupport.SPARK_ROW_SCHEMA,
       ParquetTypesConverter.convertToString(relation.output))
@@ -102,13 +114,41 @@ case class ParquetTableScan(
       SQLConf.PARQUET_CACHE_METADATA,
       sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "false"))
 
-    sc.newAPIHadoopRDD(
-      conf,
-      classOf[FilteringParquetRowInputFormat],
-      classOf[Void],
-      classOf[Row])
-      .map(_._2)
-      .filter(_ != null) // Parquet's record filters may produce null values
+    val baseRDD =
+      new org.apache.spark.rdd.NewHadoopRDD(
+        sc,
+        classOf[FilteringParquetRowInputFormat],
+        classOf[Void],
+        classOf[Row],
+        conf)
+
+    if (partOutput.nonEmpty) {
+      baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
+        val partValue = "([^=]+)=([^=]+)".r
+        val partValues =
+          split.asInstanceOf[parquet.hadoop.ParquetInputSplit]
+            .getPath
+            .toString
+            .split("/")
+            .flatMap {
+              case partValue(key, value) => Some(key -> value)
+              case _ => None
+            }.toMap
+
+        val partitionRowValues =
+          partOutput.map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow))
+
+        new Iterator[Row] {
+          private[this] val joinedRow = new JoinedRow(Row(partitionRowValues:_*), null)
+
+          def hasNext = iter.hasNext
+
+          def next() = joinedRow.withRight(iter.next()._2)
+        }
+      }
+    } else {
+      baseRDD.map(_._2)
+    }.filter(_ != null) // Parquet's record filters may produce null values
   }
 
   /**
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index a8da676ffa..ff32c7c90a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -79,6 +79,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
   // Change the default SQL dialect to HiveQL
   override private[spark] def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
 
+  /**
+   * When true, enables an experimental feature where metastore tables that use the parquet SerDe
+   * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
+   * SerDe.
+   */
+  private[spark] def convertMetastoreParquet: Boolean =
+    getConf("spark.sql.hive.convertMetastoreParquet", "false") == "true"
+
   override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
     new this.QueryExecution { val logical = plan }
 
@@ -326,6 +334,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
       TakeOrdered,
       ParquetOperations,
       InMemoryScans,
+      ParquetConversion, // Must be before HiveTableScans
       HiveTableScans,
       DataSinks,
       Scripts,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 5fcc1bd4b9..389ace726d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -17,14 +17,20 @@
 
 package org.apache.spark.sql.hive
 
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.hive.execution._
 import org.apache.spark.sql.columnar.InMemoryRelation
+import org.apache.spark.sql.parquet.{ParquetRelation, ParquetTableScan}
+
+import scala.collection.JavaConversions._
 
 private[hive] trait HiveStrategies {
   // Possibly being too clever with types here... or not clever enough.
@@ -32,6 +38,115 @@ private[hive] trait HiveStrategies {
 
   val hiveContext: HiveContext
 
+  /**
+   * :: Experimental ::
+   * Finds table scans that would use the Hive SerDe and replaces them with our own native parquet
+   * table scan operator.
+   *
+   * TODO: Much of this logic is duplicated in HiveTableScan.  Ideally we would do some refactoring
+   * but since this is after the code freeze for 1.1 all logic is here to minimize disruption.
+   *
+   * Other issues:
+   *  - Much of this logic assumes case insensitive resolution.
+   */
+  @Experimental
+  object ParquetConversion extends Strategy {
+    implicit class LogicalPlanHacks(s: SchemaRDD) {
+      def lowerCase =
+        new SchemaRDD(s.sqlContext, LowerCaseSchema(s.logicalPlan))
+
+      def addPartitioningAttributes(attrs: Seq[Attribute]) =
+        new SchemaRDD(
+          s.sqlContext,
+          s.logicalPlan transform {
+            case p: ParquetRelation => p.copy(partitioningAttributes = attrs)
+          })
+    }
+
+    implicit class PhysicalPlanHacks(originalPlan: SparkPlan) {
+      def fakeOutput(newOutput: Seq[Attribute]) =
+        OutputFaker(
+          originalPlan.output.map(a =>
+            newOutput.find(a.name.toLowerCase == _.name.toLowerCase)
+              .getOrElse(
+                sys.error(s"Can't find attribute $a to fake in set ${newOutput.mkString(",")}"))),
+          originalPlan)
+    }
+
+    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+      case PhysicalOperation(projectList, predicates, relation: MetastoreRelation)
+          if relation.tableDesc.getSerdeClassName.contains("Parquet") &&
+             hiveContext.convertMetastoreParquet =>
+
+        // Filter out all predicates that only deal with partition keys
+        val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet
+        val (pruningPredicates, otherPredicates) = predicates.partition {
+          _.references.map(_.exprId).subsetOf(partitionKeyIds)
+        }
+
+        // We are going to throw the predicates and projection back at the whole optimization
+        // sequence so lets unresolve all the attributes, allowing them to be rebound to the
+        // matching parquet attributes.
+        val unresolvedOtherPredicates = otherPredicates.map(_ transform {
+          case a: AttributeReference => UnresolvedAttribute(a.name)
+        }).reduceOption(And).getOrElse(Literal(true))
+
+        val unresolvedProjection = projectList.map(_ transform {
+          case a: AttributeReference => UnresolvedAttribute(a.name)
+        })
+
+        if (relation.hiveQlTable.isPartitioned) {
+          val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true))
+          // Translate the predicate so that it automatically casts the input values to the correct
+          // data types during evaluation
+          val castedPredicate = rawPredicate transform {
+            case a: AttributeReference =>
+              val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId)
+              val key = relation.partitionKeys(idx)
+              Cast(BoundReference(idx, StringType, nullable = true), key.dataType)
+          }
+
+          val inputData = new GenericMutableRow(relation.partitionKeys.size)
+          val pruningCondition =
+            if(codegenEnabled) {
+              GeneratePredicate(castedPredicate)
+            } else {
+              InterpretedPredicate(castedPredicate)
+            }
+
+          val partitions = relation.hiveQlPartitions.filter { part =>
+            val partitionValues = part.getValues
+            var i = 0
+            while (i < partitionValues.size()) {
+              inputData(i) = partitionValues(i)
+              i += 1
+            }
+            pruningCondition(inputData)
+          }
+
+          hiveContext
+            .parquetFile(partitions.map(_.getLocation).mkString(","))
+            .addPartitioningAttributes(relation.partitionKeys)
+            .lowerCase
+            .where(unresolvedOtherPredicates)
+            .select(unresolvedProjection:_*)
+            .queryExecution
+            .executedPlan
+            .fakeOutput(projectList.map(_.toAttribute)):: Nil
+        } else {
+          hiveContext
+            .parquetFile(relation.hiveQlTable.getDataLocation.getPath)
+            .lowerCase
+            .where(unresolvedOtherPredicates)
+            .select(unresolvedProjection:_*)
+            .queryExecution
+            .executedPlan
+            .fakeOutput(projectList.map(_.toAttribute)) :: Nil
+        }
+      case _ => Nil
+    }
+  }
+
   object Scripts extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case logical.ScriptTransformation(input, script, output, child) =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala
new file mode 100644
index 0000000000..544abfc324
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.parquet
+
+import java.util.Properties
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category
+import org.apache.hadoop.hive.serde2.{SerDeStats, SerDe}
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector
+import org.apache.hadoop.io.Writable
+
+/**
+ * A placeholder that allows SparkSQL users to create metastore tables that are stored as
+ * parquet files.  It is only intended to pass the checks that the serde is valid and exists
+ * when a CREATE TABLE is run.  The actual work of decoding will be done by ParquetTableScan
+ * when "spark.sql.hive.convertMetastoreParquet" is set to true.
+ */
+@deprecated("No code should depend on FakeParquetHiveSerDe as it is only intended as a " +
+            "placeholder in the Hive MetaStore")
+class FakeParquetSerDe extends SerDe {
+  override def getObjectInspector: ObjectInspector = new ObjectInspector {
+    override def getCategory: Category = Category.PRIMITIVE
+
+    override def getTypeName: String = "string"
+  }
+
+  override def deserialize(p1: Writable): AnyRef = throwError
+
+  override def initialize(p1: Configuration, p2: Properties): Unit = {}
+
+  override def getSerializedClass: Class[_ <: Writable] = throwError
+
+  override def getSerDeStats: SerDeStats = throwError
+
+  override def serialize(p1: scala.Any, p2: ObjectInspector): Writable = throwError
+
+  private def throwError =
+    sys.error(
+      "spark.sql.hive.convertMetastoreParquet must be set to true to use FakeParquetSerDe")
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala
new file mode 100644
index 0000000000..0723be7298
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala
@@ -0,0 +1,171 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import java.io.File
+
+import org.apache.spark.sql.hive.execution.HiveTableScan
+import org.scalatest.BeforeAndAfterAll
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.sql.{SQLConf, QueryTest}
+import org.apache.spark.sql.execution.{BroadcastHashJoin, ShuffledHashJoin}
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.hive.test.TestHive._
+
+case class ParquetData(intField: Int, stringField: String)
+
+/**
+ * Tests for our SerDe -> Native parquet scan conversion.
+ */
+class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll(): Unit = {
+    setConf("spark.sql.hive.convertMetastoreParquet", "true")
+  }
+
+  override def afterAll(): Unit = {
+    setConf("spark.sql.hive.convertMetastoreParquet", "false")
+  }
+
+  val partitionedTableDir = File.createTempFile("parquettests", "sparksql")
+  partitionedTableDir.delete()
+  partitionedTableDir.mkdir()
+
+  (1 to 10).foreach { p =>
+    val partDir = new File(partitionedTableDir, s"p=$p")
+    sparkContext.makeRDD(1 to 10)
+      .map(i => ParquetData(i, s"part-$p"))
+      .saveAsParquetFile(partDir.getCanonicalPath)
+  }
+
+  sql(s"""
+    create external table partitioned_parquet
+    (
+      intField INT,
+      stringField STRING
+    )
+    PARTITIONED BY (p int)
+    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+     STORED AS
+     INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+     OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+    location '${partitionedTableDir.getCanonicalPath}'
+    """)
+
+  sql(s"""
+    create external table normal_parquet
+    (
+      intField INT,
+      stringField STRING
+    )
+    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+     STORED AS
+     INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+     OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+    location '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
+    """)
+
+  (1 to 10).foreach { p =>
+    sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)")
+  }
+
+  test("project the partitioning column") {
+    checkAnswer(
+      sql("SELECT p, count(*) FROM partitioned_parquet group by p"),
+      (1, 10) ::
+      (2, 10) ::
+      (3, 10) ::
+      (4, 10) ::
+      (5, 10) ::
+      (6, 10) ::
+      (7, 10) ::
+      (8, 10) ::
+      (9, 10) ::
+      (10, 10) :: Nil
+    )
+  }
+
+  test("project partitioning and non-partitioning columns") {
+    checkAnswer(
+      sql("SELECT stringField, p, count(intField) " +
+        "FROM partitioned_parquet GROUP BY p, stringField"),
+      ("part-1", 1, 10) ::
+      ("part-2", 2, 10) ::
+      ("part-3", 3, 10) ::
+      ("part-4", 4, 10) ::
+      ("part-5", 5, 10) ::
+      ("part-6", 6, 10) ::
+      ("part-7", 7, 10) ::
+      ("part-8", 8, 10) ::
+      ("part-9", 9, 10) ::
+      ("part-10", 10, 10) :: Nil
+    )
+  }
+
+  test("simple count") {
+    checkAnswer(
+      sql("SELECT COUNT(*) FROM partitioned_parquet"),
+      100)
+  }
+
+  test("pruned count") {
+    checkAnswer(
+      sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p = 1"),
+      10)
+  }
+
+  test("multi-partition pruned count") {
+    checkAnswer(
+      sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p IN (1,2,3)"),
+      30)
+  }
+
+  test("non-partition predicates") {
+    checkAnswer(
+      sql("SELECT COUNT(*) FROM partitioned_parquet WHERE intField IN (1,2,3)"),
+      30)
+  }
+
+  test("sum") {
+    checkAnswer(
+      sql("SELECT SUM(intField) FROM partitioned_parquet WHERE intField IN (1,2,3) AND p = 1"),
+      1 + 2 + 3
+    )
+  }
+
+  test("non-part select(*)") {
+    checkAnswer(
+      sql("SELECT COUNT(*) FROM normal_parquet"),
+      10
+    )
+  }
+
+  test("conversion is working") {
+    assert(
+      sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
+        case _: HiveTableScan => true
+      }.isEmpty)
+    assert(
+      sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
+        case _: ParquetTableScan => true
+      }.nonEmpty)
+  }
+}
-- 
GitLab