Skip to content
Snippets Groups Projects
Commit 064fadd2 authored by jayadevanmurali's avatar jayadevanmurali Committed by Wenchen Fan
Browse files

[SPARK-19059][SQL] Unable to retrieve data from parquet table whose name startswith underscore

## What changes were proposed in this pull request?
The initial shouldFilterOut() method invocation filter the root path name(table name in the intial call) and remove if it contains _. I moved the check one level below, so it first list files/directories in the given root path and then apply filter.
(Please fill in changes proposed in this fix)

## How was this patch tested?
Added new test case for this scenario
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: jayadevanmurali <jayadevan.m@tcs.com>
Author: jayadevan <jayadevan.m@tcs.com>

Closes #16635 from jayadevanmurali/branch-0.1-SPARK-19059.
parent 8ccca917
No related branches found
No related tags found
No related merge requests found
......@@ -385,55 +385,54 @@ object PartitioningAwareFileIndex extends Logging {
logTrace(s"Listing $path")
val fs = path.getFileSystem(hadoopConf)
val name = path.getName.toLowerCase
if (shouldFilterOut(name)) {
Seq.empty[FileStatus]
} else {
// [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist
// Note that statuses only include FileStatus for the files and dirs directly under path,
// and does not include anything else recursively.
val statuses = try fs.listStatus(path) catch {
case _: FileNotFoundException =>
logWarning(s"The directory $path was not found. Was it deleted very recently?")
Array.empty[FileStatus]
}
val allLeafStatuses = {
val (dirs, topLevelFiles) = statuses.partition(_.isDirectory)
val nestedFiles: Seq[FileStatus] = sessionOpt match {
case Some(session) =>
bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2)
case _ =>
dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt))
}
val allFiles = topLevelFiles ++ nestedFiles
if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
}
// [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist
// Note that statuses only include FileStatus for the files and dirs directly under path,
// and does not include anything else recursively.
val statuses = try fs.listStatus(path) catch {
case _: FileNotFoundException =>
logWarning(s"The directory $path was not found. Was it deleted very recently?")
Array.empty[FileStatus]
}
allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
case f: LocatedFileStatus =>
f
// NOTE:
//
// - Although S3/S3A/S3N file system can be quite slow for remote file metadata
// operations, calling `getFileBlockLocations` does no harm here since these file system
// implementations don't actually issue RPC for this method.
//
// - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
// be a big deal since we always use to `listLeafFilesInParallel` when the number of
// paths exceeds threshold.
case f =>
// The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
// which is very slow on some file system (RawLocalFileSystem, which is launch a
// subprocess and parse the stdout).
val locations = fs.getFileBlockLocations(f, 0, f.getLen)
val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
if (f.isSymlink) {
lfs.setSymlink(f.getSymlink)
}
lfs
val filteredStatuses = statuses.filterNot(status => shouldFilterOut(status.getPath.getName))
val allLeafStatuses = {
val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
val nestedFiles: Seq[FileStatus] = sessionOpt match {
case Some(session) =>
bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2)
case _ =>
dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt))
}
val allFiles = topLevelFiles ++ nestedFiles
if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
}
allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
case f: LocatedFileStatus =>
f
// NOTE:
//
// - Although S3/S3A/S3N file system can be quite slow for remote file metadata
// operations, calling `getFileBlockLocations` does no harm here since these file system
// implementations don't actually issue RPC for this method.
//
// - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
// be a big deal since we always use to `listLeafFilesInParallel` when the number of
// paths exceeds threshold.
case f =>
// The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
// which is very slow on some file system (RawLocalFileSystem, which is launch a
// subprocess and parse the stdout).
val locations = fs.getFileBlockLocations(f, 0, f.getLen)
val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
if (f.isSymlink) {
lfs.setSymlink(f.getSymlink)
}
lfs
}
}
......
......@@ -2513,4 +2513,12 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
}
test("SPARK-19059: read file based table whose name starts with underscore") {
withTable("_tbl") {
sql("CREATE TABLE `_tbl`(i INT) USING parquet")
sql("INSERT INTO `_tbl` VALUES (1), (2), (3)")
checkAnswer( sql("SELECT * FROM `_tbl`"), Row(1) :: Row(2) :: Row(3) :: Nil)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment