Skip to content
Snippets Groups Projects
Commit 223fa218 authored by Shixiong Zhu's avatar Shixiong Zhu Committed by Tathagata Das
Browse files

[SPARK-18510][SQL] Follow up to address comments in #15951

## What changes were proposed in this pull request?

This PR addressed the rest comments in #15951.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #15997 from zsxwing/SPARK-18510-follow-up.
parent 0d1bf2b6
No related branches found
No related tags found
No related merge requests found
......@@ -118,8 +118,10 @@ case class DataSource(
private def getOrInferFileFormatSchema(
format: FileFormat,
justPartitioning: Boolean = false): (StructType, StructType) = {
// the operations below are expensive therefore try not to do them if we don't need to
lazy val tempFileCatalog = {
// the operations below are expensive therefore try not to do them if we don't need to, e.g.,
// in streaming mode, we have already inferred and registered partition columns, we will
// never have to materialize the lazy val below
lazy val tempFileIndex = {
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val globbedPaths = allPaths.toSeq.flatMap { path =>
......@@ -133,7 +135,7 @@ case class DataSource(
val partitionSchema = if (partitionColumns.isEmpty && catalogTable.isEmpty) {
// Try to infer partitioning, because no DataSource in the read path provides the partitioning
// columns properly unless it is a Hive DataSource
val resolved = tempFileCatalog.partitionSchema.map { partitionField =>
val resolved = tempFileIndex.partitionSchema.map { partitionField =>
val equality = sparkSession.sessionState.conf.resolver
// SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred
userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse(
......@@ -141,17 +143,17 @@ case class DataSource(
}
StructType(resolved)
} else {
// in streaming mode, we have already inferred and registered partition columns, we will
// never have to materialize the lazy val below
lazy val inferredPartitions = tempFileCatalog.partitionSchema
// maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred
// partitioning
if (userSpecifiedSchema.isEmpty) {
val inferredPartitions = tempFileIndex.partitionSchema
inferredPartitions
} else {
val partitionFields = partitionColumns.map { partitionColumn =>
userSpecifiedSchema.flatMap(_.find(_.name == partitionColumn)).orElse {
val inferredOpt = inferredPartitions.find(_.name == partitionColumn)
val equality = sparkSession.sessionState.conf.resolver
userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse {
val inferredPartitions = tempFileIndex.partitionSchema
val inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn))
if (inferredOpt.isDefined) {
logDebug(
s"""Type of partition column: $partitionColumn not found in specified schema
......@@ -163,7 +165,7 @@ case class DataSource(
|Falling back to inferred dataType if it exists.
""".stripMargin)
}
inferredPartitions.find(_.name == partitionColumn)
inferredOpt
}.getOrElse {
throw new AnalysisException(s"Failed to resolve the schema for $format for " +
s"the partition column: $partitionColumn. It must be specified manually.")
......@@ -182,7 +184,7 @@ case class DataSource(
format.inferSchema(
sparkSession,
caseInsensitiveOptions,
tempFileCatalog.allFiles())
tempFileIndex.allFiles())
}.getOrElse {
throw new AnalysisException(
s"Unable to infer schema for $format. It must be specified manually.")
......@@ -224,8 +226,11 @@ case class DataSource(
"you may be able to create a static DataFrame on that directory with " +
"'spark.read.load(directory)' and infer schema from it.")
}
val (schema, partCols) = getOrInferFileFormatSchema(format)
SourceInfo(s"FileSource[$path]", StructType(schema ++ partCols), partCols.fieldNames)
val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)
SourceInfo(
s"FileSource[$path]",
StructType(dataSchema ++ partitionSchema),
partitionSchema.fieldNames)
case _ =>
throw new UnsupportedOperationException(
......@@ -379,7 +384,7 @@ case class DataSource(
globPath
}.toArray
val (dataSchema, inferredPartitionSchema) = getOrInferFileFormatSchema(format)
val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)
val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
......@@ -388,12 +393,12 @@ case class DataSource(
catalogTable.get,
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L))
} else {
new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(inferredPartitionSchema))
new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema))
}
HadoopFsRelation(
fileCatalog,
partitionSchema = inferredPartitionSchema,
partitionSchema = partitionSchema,
dataSchema = dataSchema.asNullable,
bucketSpec = bucketSpec,
format,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment