From de289bf279e14e47859b5fbcd70e97b9d0759f14 Mon Sep 17 00:00:00 2001
From: Liang-Chi Hsieh <viirya@appier.com>
Date: Wed, 4 Nov 2015 10:56:32 -0800
Subject: [PATCH] [SPARK-10304][SQL] Following up checking valid dir structure
 for partition discovery

This patch follows up #8840.

Author: Liang-Chi Hsieh <viirya@appier.com>

Closes #9459 from viirya/detect_invalid_part_dir_following.
---
 .../datasources/PartitioningUtils.scala          | 14 +++++++++++++-
 .../parquet/ParquetPartitionDiscoverySuite.scala | 16 ++++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 16dc23661c..86bc3a1b6d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -81,6 +81,8 @@ private[sql] object PartitioningUtils {
       parsePartition(path, defaultPartitionName, typeInference)
     }.unzip
 
+    // We create pairs of (path -> path's partition value) here
+    // If the corresponding partition value is None, the pair will be skiped
     val pathsWithPartitionValues = paths.zip(partitionValues).flatMap(x => x._2.map(x._1 -> _))
 
     if (pathsWithPartitionValues.isEmpty) {
@@ -89,11 +91,21 @@ private[sql] object PartitioningUtils {
     } else {
       // This dataset is partitioned. We need to check whether all partitions have the same
       // partition columns and resolve potential type conflicts.
+
+      // Check if there is conflicting directory structure.
+      // For the paths such as:
+      // var paths = Seq(
+      //   "hdfs://host:9000/invalidPath",
+      //   "hdfs://host:9000/path/a=10/b=20",
+      //   "hdfs://host:9000/path/a=10.5/b=hello")
+      // It will be recognised as conflicting directory structure:
+      //   "hdfs://host:9000/invalidPath"
+      //   "hdfs://host:9000/path"
       val basePaths = optBasePaths.flatMap(x => x)
       assert(
         basePaths.distinct.size == 1,
         "Conflicting directory structures detected. Suspicious paths:\b" +
-          basePaths.mkString("\n\t", "\n\t", "\n\n"))
+          basePaths.distinct.mkString("\n\t", "\n\t", "\n\n"))
 
       val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 67b6a37fa5..61cc0da508 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -88,6 +88,22 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
       parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
     }
     assert(exception.getMessage().contains("Conflicting directory structures detected"))
+
+    // Invalid
+    // Conflicting directory structure:
+    // "hdfs://host:9000/tmp/tables/partitionedTable"
+    // "hdfs://host:9000/tmp/tables/nonPartitionedTable1"
+    // "hdfs://host:9000/tmp/tables/nonPartitionedTable2"
+    paths = Seq(
+      "hdfs://host:9000/tmp/tables/partitionedTable",
+      "hdfs://host:9000/tmp/tables/partitionedTable/p=1/",
+      "hdfs://host:9000/tmp/tables/nonPartitionedTable1",
+      "hdfs://host:9000/tmp/tables/nonPartitionedTable2")
+
+    exception = intercept[AssertionError] {
+      parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
+    }
+    assert(exception.getMessage().contains("Conflicting directory structures detected"))
   }
 
   test("parse partition") {
-- 
GitLab