diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 0a2007e15843c8994d8281d6700e10e05094905e..628c5e18936c5f4be64f7d9cd41734b27b448c98 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -77,9 +77,11 @@ private[sql] object PartitioningUtils { defaultPartitionName: String, typeInference: Boolean): PartitionSpec = { // First, we need to parse every partition's path and see if we can find partition values. - val pathsWithPartitionValues = paths.flatMap { path => - parsePartition(path, defaultPartitionName, typeInference).map(path -> _) - } + val (partitionValues, optBasePaths) = paths.map { path => + parsePartition(path, defaultPartitionName, typeInference) + }.unzip + + val pathsWithPartitionValues = paths.zip(partitionValues).flatMap(x => x._2.map(x._1 -> _)) if (pathsWithPartitionValues.isEmpty) { // This dataset is not partitioned. @@ -87,6 +89,12 @@ private[sql] object PartitioningUtils { } else { // This dataset is partitioned. We need to check whether all partitions have the same // partition columns and resolve potential type conflicts. + val basePaths = optBasePaths.flatMap(x => x) + assert( + basePaths.distinct.size == 1, + "Conflicting directory structures detected. Suspicious paths:\b" + + basePaths.mkString("\n\t", "\n\t", "\n\n")) + val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues) // Creates the StructType which represents the partition columns. @@ -110,12 +118,12 @@ private[sql] object PartitioningUtils { } /** - * Parses a single partition, returns column names and values of each partition column. For - * example, given: + * Parses a single partition, returns column names and values of each partition column, also + * the base path. For example, given: * {{{ * path = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14 * }}} - * it returns: + * it returns the partition: * {{{ * PartitionValues( * Seq("a", "b", "c"), @@ -124,34 +132,40 @@ private[sql] object PartitioningUtils { * Literal.create("hello", StringType), * Literal.create(3.14, FloatType))) * }}} + * and the base path: + * {{{ + * /path/to/partition + * }}} */ private[sql] def parsePartition( path: Path, defaultPartitionName: String, - typeInference: Boolean): Option[PartitionValues] = { + typeInference: Boolean): (Option[PartitionValues], Option[Path]) = { val columns = ArrayBuffer.empty[(String, Literal)] // Old Hadoop versions don't have `Path.isRoot` var finished = path.getParent == null var chopped = path + var basePath = path while (!finished) { // Sometimes (e.g., when speculative task is enabled), temporary directories may be left // uncleaned. Here we simply ignore them. if (chopped.getName.toLowerCase == "_temporary") { - return None + return (None, None) } val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference) maybeColumn.foreach(columns += _) + basePath = chopped chopped = chopped.getParent - finished = maybeColumn.isEmpty || chopped.getParent == null + finished = (maybeColumn.isEmpty && !columns.isEmpty) || chopped.getParent == null } if (columns.isEmpty) { - None + (None, Some(path)) } else { val (columnNames, values) = columns.reverse.unzip - Some(PartitionValues(columnNames, values)) + (Some(PartitionValues(columnNames, values)), Some(basePath)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 3a23b8ed668082cd05f67ae925a62e9f89b58056..67b6a37fa502ec6f81d9fab1a1bb71c8173ff3ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -58,14 +58,46 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha check(defaultPartitionName, Literal.create(null, NullType)) } + test("parse invalid partitioned directories") { + // Invalid + var paths = Seq( + "hdfs://host:9000/invalidPath", + "hdfs://host:9000/path/a=10/b=20", + "hdfs://host:9000/path/a=10.5/b=hello") + + var exception = intercept[AssertionError] { + parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) + } + assert(exception.getMessage().contains("Conflicting directory structures detected")) + + // Valid + paths = Seq( + "hdfs://host:9000/path/_temporary", + "hdfs://host:9000/path/a=10/b=20", + "hdfs://host:9000/path/_temporary/path") + + parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) + + // Invalid + paths = Seq( + "hdfs://host:9000/path/_temporary", + "hdfs://host:9000/path/a=10/b=20", + "hdfs://host:9000/path/path1") + + exception = intercept[AssertionError] { + parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) + } + assert(exception.getMessage().contains("Conflicting directory structures detected")) + } + test("parse partition") { def check(path: String, expected: Option[PartitionValues]): Unit = { - assert(expected === parsePartition(new Path(path), defaultPartitionName, true)) + assert(expected === parsePartition(new Path(path), defaultPartitionName, true)._1) } def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = { val message = intercept[T] { - parsePartition(new Path(path), defaultPartitionName, true).get + parsePartition(new Path(path), defaultPartitionName, true) }.getMessage assert(message.contains(expected))