Skip to content
Snippets Groups Projects
Commit 14057025 authored by Wenchen Fan's avatar Wenchen Fan Committed by Shixiong Zhu
Browse files

[SPARK-18044][STREAMING] FileStreamSource should not infer partitions in every batch

## What changes were proposed in this pull request?

In `FileStreamSource.getBatch`, we will create a `DataSource` with specified schema, to avoid inferring the schema again and again. However, we don't pass the partition columns, and will infer the partition again and again.

This PR fixes it by keeping the partition columns in `FileStreamSource`, like schema.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #15581 from cloud-fan/stream.
parent c1f344f1
No related branches found
No related tags found
No related merge requests found
......@@ -75,7 +75,7 @@ case class DataSource(
bucketSpec: Option[BucketSpec] = None,
options: Map[String, String] = Map.empty) extends Logging {
case class SourceInfo(name: String, schema: StructType)
case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String])
lazy val providingClass: Class[_] = lookupDataSource(className)
lazy val sourceInfo = sourceSchema()
......@@ -186,8 +186,11 @@ case class DataSource(
}
}
private def inferFileFormatSchema(format: FileFormat): StructType = {
userSpecifiedSchema.orElse {
/**
* Infer the schema of the given FileFormat, returns a pair of schema and partition column names.
*/
private def inferFileFormatSchema(format: FileFormat): (StructType, Seq[String]) = {
userSpecifiedSchema.map(_ -> partitionColumns).orElse {
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val allPaths = caseInsensitiveOptions.get("path")
val globbedPaths = allPaths.toSeq.flatMap { path =>
......@@ -197,14 +200,14 @@ case class DataSource(
SparkHadoopUtil.get.globPathIfNecessary(qualified)
}.toArray
val fileCatalog = new ListingFileCatalog(sparkSession, globbedPaths, options, None)
val partitionCols = fileCatalog.partitionSpec().partitionColumns.fields
val partitionSchema = fileCatalog.partitionSpec().partitionColumns
val inferred = format.inferSchema(
sparkSession,
caseInsensitiveOptions,
fileCatalog.allFiles())
inferred.map { inferredSchema =>
StructType(inferredSchema ++ partitionCols)
StructType(inferredSchema ++ partitionSchema) -> partitionSchema.map(_.name)
}
}.getOrElse {
throw new AnalysisException("Unable to infer schema. It must be specified manually.")
......@@ -217,7 +220,7 @@ case class DataSource(
case s: StreamSourceProvider =>
val (name, schema) = s.sourceSchema(
sparkSession.sqlContext, userSpecifiedSchema, className, options)
SourceInfo(name, schema)
SourceInfo(name, schema, Nil)
case format: FileFormat =>
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
......@@ -246,7 +249,8 @@ case class DataSource(
"you may be able to create a static DataFrame on that directory with " +
"'spark.read.load(directory)' and infer schema from it.")
}
SourceInfo(s"FileSource[$path]", inferFileFormatSchema(format))
val (schema, partCols) = inferFileFormatSchema(format)
SourceInfo(s"FileSource[$path]", schema, partCols)
case _ =>
throw new UnsupportedOperationException(
......@@ -266,7 +270,13 @@ case class DataSource(
throw new IllegalArgumentException("'path' is not specified")
})
new FileStreamSource(
sparkSession, path, className, sourceInfo.schema, metadataPath, options)
sparkSession = sparkSession,
path = path,
fileFormatClassName = className,
schema = sourceInfo.schema,
partitionColumns = sourceInfo.partitionColumns,
metadataPath = metadataPath,
options = options)
case _ =>
throw new UnsupportedOperationException(
s"Data source $className does not support streamed reading")
......
......@@ -35,6 +35,7 @@ class FileStreamSource(
path: String,
fileFormatClassName: String,
override val schema: StructType,
partitionColumns: Seq[String],
metadataPath: String,
options: Map[String, String]) extends Source with Logging {
......@@ -142,6 +143,7 @@ class FileStreamSource(
sparkSession,
paths = files.map(_.path),
userSpecifiedSchema = Some(schema),
partitionColumns = partitionColumns,
className = fileFormatClassName,
options = optionsWithPartitionBasePath)
Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation(
......
......@@ -94,7 +94,7 @@ class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext {
new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath)
assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L, 0))))
val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil),
val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil), Nil,
dir.getAbsolutePath, Map.empty)
// this method should throw an exception if `fs.exists` is called during resolveRelation
newSource.getBatch(None, LongOffset(1))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment