From 0368eb9d86634c83b3140ce3190cb9e0d0b7fd86 Mon Sep 17 00:00:00 2001
From: Juliusz Sompolski <julek@databricks.com>
Date: Fri, 21 Apr 2017 09:49:42 +0800
Subject: [PATCH] [SPARK-20367] Properly unescape column names of partitioning
 columns parsed from paths.

## What changes were proposed in this pull request?

When infering partitioning schema from paths, the column in parsePartitionColumn should be unescaped with unescapePathName, just like it is being done in e.g. parsePathFragmentAsSeq.

## How was this patch tested?

Added a test to FileIndexSuite.

Author: Juliusz Sompolski <julek@databricks.com>

Closes #17703 from juliuszsompolski/SPARK-20367.
---
 .../execution/datasources/PartitioningUtils.scala    |  2 +-
 .../sql/execution/datasources/FileIndexSuite.scala   | 12 ++++++++++++
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index c3583209ef..2d70172487 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -243,7 +243,7 @@ object PartitioningUtils {
     if (equalSignIndex == -1) {
       None
     } else {
-      val columnName = columnSpec.take(equalSignIndex)
+      val columnName = unescapePathName(columnSpec.take(equalSignIndex))
       assert(columnName.nonEmpty, s"Empty partition column name in '$columnSpec'")
 
       val rawColumnValue = columnSpec.drop(equalSignIndex + 1)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index a9511cbd9e..b4616826e4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
 
 import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.{KnownSizeEstimation, SizeEstimator}
@@ -236,6 +237,17 @@ class FileIndexSuite extends SharedSQLContext {
     val fileStatusCache = FileStatusCache.getOrCreate(spark)
     fileStatusCache.putLeafFiles(new Path("/tmp", "abc"), files.toArray)
   }
+
+  test("SPARK-20367 - properly unescape column names in inferPartitioning") {
+    withTempPath { path =>
+      val colToUnescape = "Column/#%'?"
+      spark
+        .range(1)
+        .select(col("id").as(colToUnescape), col("id"))
+        .write.partitionBy(colToUnescape).parquet(path.getAbsolutePath)
+      assert(spark.read.parquet(path.getAbsolutePath).schema.exists(_.name == colToUnescape))
+    }
+  }
 }
 
 class FakeParentPathFileSystem extends RawLocalFileSystem {
-- 
GitLab