diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala index 27f23c855da6ed82bb1d4d7c13e3d9fbaf59897c..e0e4ddc30b0f6f72e7cdfeb580e5b95dbc315156 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala @@ -54,9 +54,16 @@ abstract class PartitioningAwareFileCatalog( } else { prunePartitions(filters, partitionSpec()).map { case PartitionDirectory(values, path) => - Partition( - values, - leafDirToChildrenFiles(path).filterNot(_.getPath.getName startsWith "_")) + val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { + case Some(existingDir) => + // Directory has children files in it, return them + existingDir.filterNot(_.getPath.getName.startsWith("_")) + + case None => + // Directory does not exist, or has no children files + Nil + } + Partition(values, files) } } logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t")) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 607f0a10ec8f6209f5de6e4cb0da3d4d6bf83529..b0a3a803d299f495349ec1613b88a566d21fc676 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -271,8 +271,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log Some(partitionSpec)) val hadoopFsRelation = cached.getOrElse { - val paths = new Path(metastoreRelation.catalogTable.storage.locationUri.get) :: Nil - val fileCatalog = new MetaStoreFileCatalog(sparkSession, paths, partitionSpec) + val fileCatalog = new MetaStorePartitionedTableFileCatalog( + sparkSession, + new Path(metastoreRelation.catalogTable.storage.locationUri.get), + partitionSpec) val inferredSchema = if (fileType.equals("parquet")) { val inferredSchema = @@ -537,18 +539,31 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log /** * An override of the standard HDFS listing based catalog, that overrides the partition spec with * the information from the metastore. + * @param tableBasePath The default base path of the Hive metastore table + * @param partitionSpec The partition specifications from Hive metastore */ -private[hive] class MetaStoreFileCatalog( +private[hive] class MetaStorePartitionedTableFileCatalog( sparkSession: SparkSession, - paths: Seq[Path], - partitionSpecFromHive: PartitionSpec) + tableBasePath: Path, + override val partitionSpec: PartitionSpec) extends ListingFileCatalog( sparkSession, - paths, + MetaStorePartitionedTableFileCatalog.getPaths(tableBasePath, partitionSpec), Map.empty, - Some(partitionSpecFromHive.partitionColumns)) { + Some(partitionSpec.partitionColumns)) { +} - override def partitionSpec(): PartitionSpec = partitionSpecFromHive +private[hive] object MetaStorePartitionedTableFileCatalog { + /** Get the list of paths to list files in the for a metastore table */ + def getPaths(tableBasePath: Path, partitionSpec: PartitionSpec): Seq[Path] = { + // If there are no partitions currently specified then use base path, + // otherwise use the paths corresponding to the partitions. + if (partitionSpec.partitions.isEmpty) { + Seq(tableBasePath) + } else { + partitionSpec.partitions.map(_.path) + } + } } /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 6e93bbde26583fbb4838c1ef560798efe37cae4d..f52c6e48c5760d000de1b06e6625260ad67708b1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -529,6 +529,40 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test") } + + test("SPARK-15248: explicitly added partitions should be readable") { + withTable("test_added_partitions", "test_temp") { + withTempDir { src => + val partitionDir = new File(src, "partition").getCanonicalPath + sql( + """ + |CREATE TABLE test_added_partitions (a STRING) + |PARTITIONED BY (b INT) + |STORED AS PARQUET + """.stripMargin) + + // Temp table to insert data into partitioned table + Seq("foo", "bar").toDF("a").registerTempTable("test_temp") + sql("INSERT INTO test_added_partitions PARTITION(b='0') SELECT a FROM test_temp") + + checkAnswer( + sql("SELECT * FROM test_added_partitions"), + Seq(("foo", 0), ("bar", 0)).toDF("a", "b")) + + // Create partition without data files and check whether it can be read + sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1') LOCATION '$partitionDir'") + checkAnswer( + sql("SELECT * FROM test_added_partitions"), + Seq(("foo", 0), ("bar", 0)).toDF("a", "b")) + + // Add data files to partition directory and check whether they can be read + Seq("baz").toDF("a").write.mode(SaveMode.Overwrite).parquet(partitionDir) + checkAnswer( + sql("SELECT * FROM test_added_partitions"), + Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b")) + } + } + } } /**