diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 16dc23661c0706637a3b5e1dbfb5646e6b4a68d0..86bc3a1b6dab24e7153d71dd3c52e503f4fc971c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -81,6 +81,8 @@ private[sql] object PartitioningUtils { parsePartition(path, defaultPartitionName, typeInference) }.unzip + // We create pairs of (path -> path's partition value) here + // If the corresponding partition value is None, the pair will be skiped val pathsWithPartitionValues = paths.zip(partitionValues).flatMap(x => x._2.map(x._1 -> _)) if (pathsWithPartitionValues.isEmpty) { @@ -89,11 +91,21 @@ private[sql] object PartitioningUtils { } else { // This dataset is partitioned. We need to check whether all partitions have the same // partition columns and resolve potential type conflicts. + + // Check if there is conflicting directory structure. + // For the paths such as: + // var paths = Seq( + // "hdfs://host:9000/invalidPath", + // "hdfs://host:9000/path/a=10/b=20", + // "hdfs://host:9000/path/a=10.5/b=hello") + // It will be recognised as conflicting directory structure: + // "hdfs://host:9000/invalidPath" + // "hdfs://host:9000/path" val basePaths = optBasePaths.flatMap(x => x) assert( basePaths.distinct.size == 1, "Conflicting directory structures detected. Suspicious paths:\b" + - basePaths.mkString("\n\t", "\n\t", "\n\n")) + basePaths.distinct.mkString("\n\t", "\n\t", "\n\n")) val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 67b6a37fa502ec6f81d9fab1a1bb71c8173ff3ac..61cc0da50865c82bca55b7ec96a18b06a06ed2c8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -88,6 +88,22 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) } assert(exception.getMessage().contains("Conflicting directory structures detected")) + + // Invalid + // Conflicting directory structure: + // "hdfs://host:9000/tmp/tables/partitionedTable" + // "hdfs://host:9000/tmp/tables/nonPartitionedTable1" + // "hdfs://host:9000/tmp/tables/nonPartitionedTable2" + paths = Seq( + "hdfs://host:9000/tmp/tables/partitionedTable", + "hdfs://host:9000/tmp/tables/partitionedTable/p=1/", + "hdfs://host:9000/tmp/tables/nonPartitionedTable1", + "hdfs://host:9000/tmp/tables/nonPartitionedTable2") + + exception = intercept[AssertionError] { + parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) + } + assert(exception.getMessage().contains("Conflicting directory structures detected")) } test("parse partition") {