Skip to content
Snippets Groups Projects
Commit 54db7970 authored by Dilip Biswal's avatar Dilip Biswal Committed by Yin Huai
Browse files

[SPARK-11544][SQL] sqlContext doesn't use PathFilter

Apply the user supplied pathfilter while retrieving the files from fs.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #9652 from dilipbiswal/spark-11544.
parent 603a721c
No related branches found
No related tags found
No related merge requests found
...@@ -21,7 +21,8 @@ import scala.collection.mutable ...@@ -21,7 +21,8 @@ import scala.collection.mutable
import scala.util.Try import scala.util.Try
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.fs.{PathFilter, FileStatus, FileSystem, Path}
import org.apache.hadoop.mapred.{JobConf, FileInputFormat}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.{Logging, SparkContext} import org.apache.spark.{Logging, SparkContext}
...@@ -447,9 +448,15 @@ abstract class HadoopFsRelation private[sql]( ...@@ -447,9 +448,15 @@ abstract class HadoopFsRelation private[sql](
val hdfsPath = new Path(path) val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf) val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
logInfo(s"Listing $qualified on driver") logInfo(s"Listing $qualified on driver")
Try(fs.listStatus(qualified)).getOrElse(Array.empty) // Dummy jobconf to get to the pathFilter defined in configuration
val jobConf = new JobConf(hadoopConf, this.getClass())
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
if (pathFilter != null) {
Try(fs.listStatus(qualified, pathFilter)).getOrElse(Array.empty)
} else {
Try(fs.listStatus(qualified)).getOrElse(Array.empty)
}
}.filterNot { status => }.filterNot { status =>
val name = status.getPath.getName val name = status.getPath.getName
name.toLowerCase == "_temporary" || name.startsWith(".") name.toLowerCase == "_temporary" || name.startsWith(".")
...@@ -847,8 +854,16 @@ private[sql] object HadoopFsRelation extends Logging { ...@@ -847,8 +854,16 @@ private[sql] object HadoopFsRelation extends Logging {
if (name == "_temporary" || name.startsWith(".")) { if (name == "_temporary" || name.startsWith(".")) {
Array.empty Array.empty
} else { } else {
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir) // Dummy jobconf to get to the pathFilter defined in configuration
files ++ dirs.flatMap(dir => listLeafFiles(fs, dir)) val jobConf = new JobConf(fs.getConf, this.getClass())
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
if (pathFilter != null) {
val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDir)
files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
} else {
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
}
} }
} }
......
...@@ -19,19 +19,27 @@ package org.apache.spark.sql.execution.datasources.json ...@@ -19,19 +19,27 @@ package org.apache.spark.sql.execution.datasources.json
import java.io.{File, StringWriter} import java.io.{File, StringWriter}
import java.sql.{Date, Timestamp} import java.sql.{Date, Timestamp}
import scala.collection.JavaConverters._
import com.fasterxml.jackson.core.JsonFactory import com.fasterxml.jackson.core.JsonFactory
import org.apache.spark.rdd.RDD import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
import org.scalactic.Tolerance._ import org.scalactic.Tolerance._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation} import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType
import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
class TestFileFilter extends PathFilter {
override def accept(path: Path): Boolean = path.getParent.getName != "p=2"
}
class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
import testImplicits._ import testImplicits._
...@@ -1390,4 +1398,28 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ...@@ -1390,4 +1398,28 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
) )
} }
} }
test("SPARK-11544 test pathfilter") {
withTempPath { dir =>
val path = dir.getCanonicalPath
val df = sqlContext.range(2)
df.write.json(path + "/p=1")
df.write.json(path + "/p=2")
assert(sqlContext.read.json(path).count() === 4)
val clonedConf = new Configuration(hadoopConfiguration)
try {
hadoopConfiguration.setClass(
"mapreduce.input.pathFilter.class",
classOf[TestFileFilter],
classOf[PathFilter])
assert(sqlContext.read.json(path).count() === 2)
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
hadoopConfiguration.clear()
clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
}
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment