Skip to content
Snippets Groups Projects
Commit 85d609cf authored by Burak Yavuz's avatar Burak Yavuz Committed by Josh Rosen
Browse files

[SPARK-17613] S3A base paths with no '/' at the end return empty DataFrames

## What changes were proposed in this pull request?

Consider you have a bucket as `s3a://some-bucket`
and under it you have files:
```
s3a://some-bucket/file1.parquet
s3a://some-bucket/file2.parquet
```
Getting the parent path of `s3a://some-bucket/file1.parquet` yields
`s3a://some-bucket/` and the ListingFileCatalog uses this as the key in the hash map.

When catalog.allFiles is called, we use `s3a://some-bucket` (no slash at the end) to get the list of files, and we're left with an empty list!

This PR fixes this by adding a `/` at the end of the `URI` iff the given `Path` doesn't have a parent, i.e. is the root. This is a no-op if the path already had a `/` at the end, and is handled through the Hadoop Path, path merging semantics.

## How was this patch tested?

Unit test in `FileCatalogSuite`.

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #15169 from brkyvz/SPARK-17613.
parent 9f24a17c
No related branches found
No related tags found
No related merge requests found
......@@ -76,7 +76,15 @@ abstract class PartitioningAwareFileCatalog(
paths.flatMap { path =>
// Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
val fs = path.getFileSystem(hadoopConf)
val qualifiedPath = fs.makeQualified(path)
val qualifiedPathPre = fs.makeQualified(path)
val qualifiedPath: Path = if (qualifiedPathPre.isRoot && !qualifiedPathPre.isAbsolute) {
// SPARK-17613: Always append `Path.SEPARATOR` to the end of parent directories,
// because the `leafFile.getParent` would have returned an absolute path with the
// separator at the end.
new Path(qualifiedPathPre, Path.SEPARATOR)
} else {
qualifiedPathPre
}
// There are three cases possible with each path
// 1. The path is a directory and has children files in it. Then it must be present in
......
......@@ -18,10 +18,12 @@
package org.apache.spark.sql.execution.datasources
import java.io.File
import java.net.URI
import scala.collection.mutable
import scala.language.reflectiveCalls
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.test.SharedSQLContext
......@@ -78,4 +80,45 @@ class FileCatalogSuite extends SharedSQLContext {
assert(catalog1.listLeafFiles(catalog1.paths).isEmpty)
}
}
test("SPARK-17613 - PartitioningAwareFileCatalog: base path w/o '/' at end") {
class MockCatalog(
override val paths: Seq[Path]) extends PartitioningAwareFileCatalog(spark, Map.empty, None) {
override def refresh(): Unit = {}
override def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = mutable.LinkedHashMap(
new Path("mockFs://some-bucket/file1.json") -> new FileStatus()
)
override def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = Map(
new Path("mockFs://some-bucket/") -> Array(new FileStatus())
)
override def partitionSpec(): PartitionSpec = {
PartitionSpec.emptySpec
}
}
withSQLConf(
"fs.mockFs.impl" -> classOf[FakeParentPathFileSystem].getName,
"fs.mockFs.impl.disable.cache" -> "true") {
val pathWithSlash = new Path("mockFs://some-bucket/")
assert(pathWithSlash.getParent === null)
val pathWithoutSlash = new Path("mockFs://some-bucket")
assert(pathWithoutSlash.getParent === null)
val catalog1 = new MockCatalog(Seq(pathWithSlash))
val catalog2 = new MockCatalog(Seq(pathWithoutSlash))
assert(catalog1.allFiles().nonEmpty)
assert(catalog2.allFiles().nonEmpty)
}
}
}
class FakeParentPathFileSystem extends RawLocalFileSystem {
override def getScheme: String = "mockFs"
override def getUri: URI = {
URI.create("mockFs://some-bucket")
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment