From 81c68eceba3a857ba7349c6892dc336c3ebd11dc Mon Sep 17 00:00:00 2001
From: Tathagata Das <tathagata.das1565@gmail.com>
Date: Wed, 11 May 2016 12:35:41 -0700
Subject: [PATCH] [SPARK-15248][SQL] Make MetastoreFileCatalog consider
 directories from partition specs of a partitioned metastore table

Table partitions can be added with locations different from default warehouse location of a hive table.
`CREATE TABLE parquetTable (a int) PARTITIONED BY (b int) STORED AS parquet `
`ALTER TABLE parquetTable ADD PARTITION (b=1) LOCATION '/partition'`
Querying such a table throws error as the MetastoreFileCatalog does not list the added partition directory, it only lists the default base location.

```
[info] - SPARK-15248: explicitly added partitions should be readable *** FAILED *** (1 second, 8 milliseconds)
[info]   java.util.NoSuchElementException: key not found: file:/Users/tdas/Projects/Spark/spark2/target/tmp/spark-b39ad224-c5d1-4966-8981-fb45a2066d61/partition
[info]   at scala.collection.MapLike$class.default(MapLike.scala:228)
[info]   at scala.collection.AbstractMap.default(Map.scala:59)
[info]   at scala.collection.MapLike$class.apply(MapLike.scala:141)
[info]   at scala.collection.AbstractMap.apply(Map.scala:59)
[info]   at org.apache.spark.sql.execution.datasources.PartitioningAwareFileCatalog$$anonfun$listFiles$1.apply(PartitioningAwareFileCatalog.scala:59)
[info]   at org.apache.spark.sql.execution.datasources.PartitioningAwareFileCatalog$$anonfun$listFiles$1.apply(PartitioningAwareFileCatalog.scala:55)
[info]   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[info]   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[info]   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[info]   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
[info]   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
[info]   at org.apache.spark.sql.execution.datasources.PartitioningAwareFileCatalog.listFiles(PartitioningAwareFileCatalog.scala:55)
[info]   at org.apache.spark.sql.execution.datasources.FileSourceStrategy$.apply(FileSourceStrategy.scala:93)
[info]   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59)
[info]   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59)
[info]   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
[info]   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
[info]   at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:60)
[info]   at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:55)
[info]   at org.apache.spark.sql.execution.SparkStrategies$SpecialLimits$.apply(SparkStrategies.scala:55)
[info]   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59)
[info]   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59)
[info]   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
[info]   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
[info]   at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:60)
[info]   at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:77)
[info]   at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
[info]   at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:82)
[info]   at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:82)
[info]   at org.apache.spark.sql.QueryTest.assertEmptyMissingInput(QueryTest.scala:330)
[info]   at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:146)
[info]   at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:159)
[info]   at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12$$anonfun$apply$mcV$sp$7$$anonfun$apply$mcV$sp$25.apply(parquetSuites.scala:554)
[info]   at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12$$anonfun$apply$mcV$sp$7$$anonfun$apply$mcV$sp$25.apply(parquetSuites.scala:535)
[info]   at org.apache.spark.sql.test.SQLTestUtils$class.withTempDir(SQLTestUtils.scala:125)
[info]   at org.apache.spark.sql.hive.ParquetPartitioningTest.withTempDir(parquetSuites.scala:726)
[info]   at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12$$anonfun$apply$mcV$sp$7.apply$mcV$sp(parquetSuites.scala:535)
[info]   at org.apache.spark.sql.test.SQLTestUtils$class.withTable(SQLTestUtils.scala:166)
[info]   at org.apache.spark.sql.hive.ParquetPartitioningTest.withTable(parquetSuites.scala:726)
[info]   at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12.apply$mcV$sp(parquetSuites.scala:534)
[info]   at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12.apply(parquetSuites.scala:534)
[info]   at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12.apply(parquetSuites.scala:534)
```

The solution in this PR to get the paths to list from the partition spec and not rely on the default table path alone.

unit tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #13022 from tdas/SPARK-15248.
---
 .../PartitioningAwareFileCatalog.scala        | 13 +++++--
 .../spark/sql/hive/HiveMetastoreCatalog.scala | 31 ++++++++++++-----
 .../apache/spark/sql/hive/parquetSuites.scala | 34 +++++++++++++++++++
 3 files changed, 67 insertions(+), 11 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index 27f23c855d..e0e4ddc30b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -54,9 +54,16 @@ abstract class PartitioningAwareFileCatalog(
     } else {
       prunePartitions(filters, partitionSpec()).map {
         case PartitionDirectory(values, path) =>
-          Partition(
-            values,
-            leafDirToChildrenFiles(path).filterNot(_.getPath.getName startsWith "_"))
+          val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
+            case Some(existingDir) =>
+              // Directory has children files in it, return them
+              existingDir.filterNot(_.getPath.getName.startsWith("_"))
+
+            case None =>
+              // Directory does not exist, or has no children files
+              Nil
+          }
+          Partition(values, files)
       }
     }
     logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t"))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 607f0a10ec..b0a3a803d2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -271,8 +271,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
         Some(partitionSpec))
 
       val hadoopFsRelation = cached.getOrElse {
-        val paths = new Path(metastoreRelation.catalogTable.storage.locationUri.get) :: Nil
-        val fileCatalog = new MetaStoreFileCatalog(sparkSession, paths, partitionSpec)
+        val fileCatalog = new MetaStorePartitionedTableFileCatalog(
+          sparkSession,
+          new Path(metastoreRelation.catalogTable.storage.locationUri.get),
+          partitionSpec)
 
         val inferredSchema = if (fileType.equals("parquet")) {
           val inferredSchema =
@@ -537,18 +539,31 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
 /**
  * An override of the standard HDFS listing based catalog, that overrides the partition spec with
  * the information from the metastore.
+ * @param tableBasePath The default base path of the Hive metastore table
+ * @param partitionSpec The partition specifications from Hive metastore
  */
-private[hive] class MetaStoreFileCatalog(
+private[hive] class MetaStorePartitionedTableFileCatalog(
     sparkSession: SparkSession,
-    paths: Seq[Path],
-    partitionSpecFromHive: PartitionSpec)
+    tableBasePath: Path,
+    override val partitionSpec: PartitionSpec)
   extends ListingFileCatalog(
     sparkSession,
-    paths,
+    MetaStorePartitionedTableFileCatalog.getPaths(tableBasePath, partitionSpec),
     Map.empty,
-    Some(partitionSpecFromHive.partitionColumns)) {
+    Some(partitionSpec.partitionColumns)) {
+}
 
-  override def partitionSpec(): PartitionSpec = partitionSpecFromHive
+private[hive] object MetaStorePartitionedTableFileCatalog {
+  /** Get the list of paths to list files in the for a metastore table */
+  def getPaths(tableBasePath: Path, partitionSpec: PartitionSpec): Seq[Path] = {
+    // If there are no partitions currently specified then use base path,
+    // otherwise use the paths corresponding to the partitions.
+    if (partitionSpec.partitions.isEmpty) {
+      Seq(tableBasePath)
+    } else {
+      partitionSpec.partitions.map(_.path)
+    }
+  }
 }
 
 /**
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 6e93bbde26..f52c6e48c5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -529,6 +529,40 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
 
     dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test")
   }
+
+  test("SPARK-15248: explicitly added partitions should be readable") {
+    withTable("test_added_partitions", "test_temp") {
+      withTempDir { src =>
+        val partitionDir = new File(src, "partition").getCanonicalPath
+        sql(
+          """
+            |CREATE TABLE test_added_partitions (a STRING)
+            |PARTITIONED BY (b INT)
+            |STORED AS PARQUET
+          """.stripMargin)
+
+        // Temp table to insert data into partitioned table
+        Seq("foo", "bar").toDF("a").registerTempTable("test_temp")
+        sql("INSERT INTO test_added_partitions PARTITION(b='0') SELECT a FROM test_temp")
+
+        checkAnswer(
+          sql("SELECT * FROM test_added_partitions"),
+          Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
+
+        // Create partition without data files and check whether it can be read
+        sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1') LOCATION '$partitionDir'")
+        checkAnswer(
+          sql("SELECT * FROM test_added_partitions"),
+          Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
+
+        // Add data files to partition directory and check whether they can be read
+        Seq("baz").toDF("a").write.mode(SaveMode.Overwrite).parquet(partitionDir)
+        checkAnswer(
+          sql("SELECT * FROM test_added_partitions"),
+          Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b"))
+      }
+    }
+  }
 }
 
 /**
-- 
GitLab