Skip to content
Snippets Groups Projects
Commit abff92bf authored by Dongjoon Hyun's avatar Dongjoon Hyun Committed by Cheng Lian
Browse files

[SPARK-16975][SQL] Column-partition path starting '_' should be handled correctly

## What changes were proposed in this pull request?

Currently, Spark ignores path names starting with underscore `_` and `.`. This causes read-failures for the column-partitioned file data sources whose partition column names starts from '_', e.g. `_col`.

**Before**
```scala
scala> spark.range(10).withColumn("_locality_code", $"id").write.partitionBy("_locality_code").save("/tmp/parquet")
scala> spark.read.parquet("/tmp/parquet")
org.apache.spark.sql.AnalysisException: Unable to infer schema for ParquetFormat at /tmp/parquet20. It must be specified manually;
```

**After**
```scala
scala> spark.range(10).withColumn("_locality_code", $"id").write.partitionBy("_locality_code").save("/tmp/parquet")
scala> spark.read.parquet("/tmp/parquet")
res2: org.apache.spark.sql.DataFrame = [id: bigint, _locality_code: int]
```

## How was this patch tested?

Pass the Jenkins with a new test case.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #14585 from dongjoon-hyun/SPARK-16975-PARQUET.
parent ccc6dc0f
No related branches found
No related tags found
No related merge requests found
...@@ -204,6 +204,6 @@ abstract class PartitioningAwareFileCatalog( ...@@ -204,6 +204,6 @@ abstract class PartitioningAwareFileCatalog(
private def isDataPath(path: Path): Boolean = { private def isDataPath(path: Path): Boolean = {
val name = path.getName val name = path.getName
!(name.startsWith("_") || name.startsWith(".")) !((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
} }
} }
...@@ -364,7 +364,7 @@ object HadoopFsRelation extends Logging { ...@@ -364,7 +364,7 @@ object HadoopFsRelation extends Logging {
// We filter everything that starts with _ and ., except _common_metadata and _metadata // We filter everything that starts with _ and ., except _common_metadata and _metadata
// because Parquet needs to find those metadata files from leaf files returned by this method. // because Parquet needs to find those metadata files from leaf files returned by this method.
// We should refactor this logic to not mix metadata files with data files. // We should refactor this logic to not mix metadata files with data files.
(pathName.startsWith("_") || pathName.startsWith(".")) && ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) &&
!pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata") !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata")
} }
......
...@@ -54,7 +54,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { ...@@ -54,7 +54,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
.getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
val jsonFiles = files.filterNot { status => val jsonFiles = files.filterNot { status =>
val name = status.getPath.getName val name = status.getPath.getName
name.startsWith("_") || name.startsWith(".") (name.startsWith("_") && !name.contains("=")) || name.startsWith(".")
}.toArray }.toArray
val jsonSchema = InferSchema.infer( val jsonSchema = InferSchema.infer(
......
...@@ -236,7 +236,8 @@ class ParquetFileFormat ...@@ -236,7 +236,8 @@ class ParquetFileFormat
// Lists `FileStatus`es of all leaf nodes (files) under all base directories. // Lists `FileStatus`es of all leaf nodes (files) under all base directories.
val leaves = allFiles.filter { f => val leaves = allFiles.filter { f =>
isSummaryFile(f.getPath) || isSummaryFile(f.getPath) ||
!(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith(".")) !((f.getPath.getName.startsWith("_") && !f.getPath.getName.contains("=")) ||
f.getPath.getName.startsWith("."))
}.toArray.sortBy(_.getPath.toString) }.toArray.sortBy(_.getPath.toString)
FileTypes( FileTypes(
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.spark.sql package org.apache.spark.sql
import java.io.File
import java.math.MathContext import java.math.MathContext
import java.sql.{Date, Timestamp} import java.sql.{Date, Timestamp}
...@@ -2637,6 +2638,14 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { ...@@ -2637,6 +2638,14 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
} }
} }
test("SPARK-16975: Column-partition path starting '_' should be handled correctly") {
withTempDir { dir =>
val parquetDir = new File(dir, "parquet").getCanonicalPath
spark.range(10).withColumn("_col", $"id").write.partitionBy("_col").save(parquetDir)
spark.read.parquet(parquetDir)
}
}
test("SPARK-16644: Aggregate should not put aggregate expressions to constraints") { test("SPARK-16644: Aggregate should not put aggregate expressions to constraints") {
withTable("tbl") { withTable("tbl") {
sql("CREATE TABLE tbl(a INT, b INT) USING parquet") sql("CREATE TABLE tbl(a INT, b INT) USING parquet")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment