Skip to content
Snippets Groups Projects
Commit 27d81d00 authored by Shixiong Zhu's avatar Shixiong Zhu Committed by Tathagata Das
Browse files

[SPARK-18510][SQL] Follow up to address comments in #15951


## What changes were proposed in this pull request?

This PR addressed the rest comments in #15951.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #15997 from zsxwing/SPARK-18510-follow-up.

(cherry picked from commit 223fa218)
Signed-off-by: default avatarTathagata Das <tathagata.das1565@gmail.com>
parent 15d2cf26
No related branches found
No related tags found
No related merge requests found
...@@ -118,8 +118,10 @@ case class DataSource( ...@@ -118,8 +118,10 @@ case class DataSource(
private def getOrInferFileFormatSchema( private def getOrInferFileFormatSchema(
format: FileFormat, format: FileFormat,
justPartitioning: Boolean = false): (StructType, StructType) = { justPartitioning: Boolean = false): (StructType, StructType) = {
// the operations below are expensive therefore try not to do them if we don't need to // the operations below are expensive therefore try not to do them if we don't need to, e.g.,
lazy val tempFileCatalog = { // in streaming mode, we have already inferred and registered partition columns, we will
// never have to materialize the lazy val below
lazy val tempFileIndex = {
val allPaths = caseInsensitiveOptions.get("path") ++ paths val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf() val hadoopConf = sparkSession.sessionState.newHadoopConf()
val globbedPaths = allPaths.toSeq.flatMap { path => val globbedPaths = allPaths.toSeq.flatMap { path =>
...@@ -133,7 +135,7 @@ case class DataSource( ...@@ -133,7 +135,7 @@ case class DataSource(
val partitionSchema = if (partitionColumns.isEmpty && catalogTable.isEmpty) { val partitionSchema = if (partitionColumns.isEmpty && catalogTable.isEmpty) {
// Try to infer partitioning, because no DataSource in the read path provides the partitioning // Try to infer partitioning, because no DataSource in the read path provides the partitioning
// columns properly unless it is a Hive DataSource // columns properly unless it is a Hive DataSource
val resolved = tempFileCatalog.partitionSchema.map { partitionField => val resolved = tempFileIndex.partitionSchema.map { partitionField =>
val equality = sparkSession.sessionState.conf.resolver val equality = sparkSession.sessionState.conf.resolver
// SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred
userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse( userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse(
...@@ -141,17 +143,17 @@ case class DataSource( ...@@ -141,17 +143,17 @@ case class DataSource(
} }
StructType(resolved) StructType(resolved)
} else { } else {
// in streaming mode, we have already inferred and registered partition columns, we will
// never have to materialize the lazy val below
lazy val inferredPartitions = tempFileCatalog.partitionSchema
// maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred // maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred
// partitioning // partitioning
if (userSpecifiedSchema.isEmpty) { if (userSpecifiedSchema.isEmpty) {
val inferredPartitions = tempFileIndex.partitionSchema
inferredPartitions inferredPartitions
} else { } else {
val partitionFields = partitionColumns.map { partitionColumn => val partitionFields = partitionColumns.map { partitionColumn =>
userSpecifiedSchema.flatMap(_.find(_.name == partitionColumn)).orElse { val equality = sparkSession.sessionState.conf.resolver
val inferredOpt = inferredPartitions.find(_.name == partitionColumn) userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse {
val inferredPartitions = tempFileIndex.partitionSchema
val inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn))
if (inferredOpt.isDefined) { if (inferredOpt.isDefined) {
logDebug( logDebug(
s"""Type of partition column: $partitionColumn not found in specified schema s"""Type of partition column: $partitionColumn not found in specified schema
...@@ -163,7 +165,7 @@ case class DataSource( ...@@ -163,7 +165,7 @@ case class DataSource(
|Falling back to inferred dataType if it exists. |Falling back to inferred dataType if it exists.
""".stripMargin) """.stripMargin)
} }
inferredPartitions.find(_.name == partitionColumn) inferredOpt
}.getOrElse { }.getOrElse {
throw new AnalysisException(s"Failed to resolve the schema for $format for " + throw new AnalysisException(s"Failed to resolve the schema for $format for " +
s"the partition column: $partitionColumn. It must be specified manually.") s"the partition column: $partitionColumn. It must be specified manually.")
...@@ -182,7 +184,7 @@ case class DataSource( ...@@ -182,7 +184,7 @@ case class DataSource(
format.inferSchema( format.inferSchema(
sparkSession, sparkSession,
caseInsensitiveOptions, caseInsensitiveOptions,
tempFileCatalog.allFiles()) tempFileIndex.allFiles())
}.getOrElse { }.getOrElse {
throw new AnalysisException( throw new AnalysisException(
s"Unable to infer schema for $format. It must be specified manually.") s"Unable to infer schema for $format. It must be specified manually.")
...@@ -224,8 +226,11 @@ case class DataSource( ...@@ -224,8 +226,11 @@ case class DataSource(
"you may be able to create a static DataFrame on that directory with " + "you may be able to create a static DataFrame on that directory with " +
"'spark.read.load(directory)' and infer schema from it.") "'spark.read.load(directory)' and infer schema from it.")
} }
val (schema, partCols) = getOrInferFileFormatSchema(format) val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)
SourceInfo(s"FileSource[$path]", StructType(schema ++ partCols), partCols.fieldNames) SourceInfo(
s"FileSource[$path]",
StructType(dataSchema ++ partitionSchema),
partitionSchema.fieldNames)
case _ => case _ =>
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
...@@ -379,7 +384,7 @@ case class DataSource( ...@@ -379,7 +384,7 @@ case class DataSource(
globPath globPath
}.toArray }.toArray
val (dataSchema, inferredPartitionSchema) = getOrInferFileFormatSchema(format) val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)
val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions && val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) { catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
...@@ -388,12 +393,12 @@ case class DataSource( ...@@ -388,12 +393,12 @@ case class DataSource(
catalogTable.get, catalogTable.get,
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L)) catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L))
} else { } else {
new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(inferredPartitionSchema)) new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema))
} }
HadoopFsRelation( HadoopFsRelation(
fileCatalog, fileCatalog,
partitionSchema = inferredPartitionSchema, partitionSchema = partitionSchema,
dataSchema = dataSchema.asNullable, dataSchema = dataSchema.asNullable,
bucketSpec = bucketSpec, bucketSpec = bucketSpec,
format, format,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment