Skip to content
Snippets Groups Projects
Commit de289bf2 authored by Liang-Chi Hsieh's avatar Liang-Chi Hsieh Committed by Yin Huai
Browse files

[SPARK-10304][SQL] Following up checking valid dir structure for partition discovery

This patch follows up #8840.

Author: Liang-Chi Hsieh <viirya@appier.com>

Closes #9459 from viirya/detect_invalid_part_dir_following.
parent 987df4bf
No related branches found
No related tags found
No related merge requests found
......@@ -81,6 +81,8 @@ private[sql] object PartitioningUtils {
parsePartition(path, defaultPartitionName, typeInference)
}.unzip
// We create pairs of (path -> path's partition value) here
// If the corresponding partition value is None, the pair will be skiped
val pathsWithPartitionValues = paths.zip(partitionValues).flatMap(x => x._2.map(x._1 -> _))
if (pathsWithPartitionValues.isEmpty) {
......@@ -89,11 +91,21 @@ private[sql] object PartitioningUtils {
} else {
// This dataset is partitioned. We need to check whether all partitions have the same
// partition columns and resolve potential type conflicts.
// Check if there is conflicting directory structure.
// For the paths such as:
// var paths = Seq(
// "hdfs://host:9000/invalidPath",
// "hdfs://host:9000/path/a=10/b=20",
// "hdfs://host:9000/path/a=10.5/b=hello")
// It will be recognised as conflicting directory structure:
// "hdfs://host:9000/invalidPath"
// "hdfs://host:9000/path"
val basePaths = optBasePaths.flatMap(x => x)
assert(
basePaths.distinct.size == 1,
"Conflicting directory structures detected. Suspicious paths:\b" +
basePaths.mkString("\n\t", "\n\t", "\n\n"))
basePaths.distinct.mkString("\n\t", "\n\t", "\n\n"))
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
......
......@@ -88,6 +88,22 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
}
assert(exception.getMessage().contains("Conflicting directory structures detected"))
// Invalid
// Conflicting directory structure:
// "hdfs://host:9000/tmp/tables/partitionedTable"
// "hdfs://host:9000/tmp/tables/nonPartitionedTable1"
// "hdfs://host:9000/tmp/tables/nonPartitionedTable2"
paths = Seq(
"hdfs://host:9000/tmp/tables/partitionedTable",
"hdfs://host:9000/tmp/tables/partitionedTable/p=1/",
"hdfs://host:9000/tmp/tables/nonPartitionedTable1",
"hdfs://host:9000/tmp/tables/nonPartitionedTable2")
exception = intercept[AssertionError] {
parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
}
assert(exception.getMessage().contains("Conflicting directory structures detected"))
}
test("parse partition") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment