From 223fa218e1f637f0d62332785a3bee225b65b990 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu <shixiong@databricks.com> Date: Wed, 23 Nov 2016 16:15:35 -0800 Subject: [PATCH] [SPARK-18510][SQL] Follow up to address comments in #15951 ## What changes were proposed in this pull request? This PR addressed the rest comments in #15951. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #15997 from zsxwing/SPARK-18510-follow-up. --- .../execution/datasources/DataSource.scala | 35 +++++++++++-------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index dbc3e71233..ccfc759c8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -118,8 +118,10 @@ case class DataSource( private def getOrInferFileFormatSchema( format: FileFormat, justPartitioning: Boolean = false): (StructType, StructType) = { - // the operations below are expensive therefore try not to do them if we don't need to - lazy val tempFileCatalog = { + // the operations below are expensive therefore try not to do them if we don't need to, e.g., + // in streaming mode, we have already inferred and registered partition columns, we will + // never have to materialize the lazy val below + lazy val tempFileIndex = { val allPaths = caseInsensitiveOptions.get("path") ++ paths val hadoopConf = sparkSession.sessionState.newHadoopConf() val globbedPaths = allPaths.toSeq.flatMap { path => @@ -133,7 +135,7 @@ case class DataSource( val partitionSchema = if (partitionColumns.isEmpty && catalogTable.isEmpty) { // Try to infer partitioning, because no DataSource in the read path provides the partitioning // columns properly unless it is a Hive DataSource - val resolved = tempFileCatalog.partitionSchema.map { partitionField => + val resolved = tempFileIndex.partitionSchema.map { partitionField => val equality = sparkSession.sessionState.conf.resolver // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse( @@ -141,17 +143,17 @@ case class DataSource( } StructType(resolved) } else { - // in streaming mode, we have already inferred and registered partition columns, we will - // never have to materialize the lazy val below - lazy val inferredPartitions = tempFileCatalog.partitionSchema // maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred // partitioning if (userSpecifiedSchema.isEmpty) { + val inferredPartitions = tempFileIndex.partitionSchema inferredPartitions } else { val partitionFields = partitionColumns.map { partitionColumn => - userSpecifiedSchema.flatMap(_.find(_.name == partitionColumn)).orElse { - val inferredOpt = inferredPartitions.find(_.name == partitionColumn) + val equality = sparkSession.sessionState.conf.resolver + userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse { + val inferredPartitions = tempFileIndex.partitionSchema + val inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn)) if (inferredOpt.isDefined) { logDebug( s"""Type of partition column: $partitionColumn not found in specified schema @@ -163,7 +165,7 @@ case class DataSource( |Falling back to inferred dataType if it exists. """.stripMargin) } - inferredPartitions.find(_.name == partitionColumn) + inferredOpt }.getOrElse { throw new AnalysisException(s"Failed to resolve the schema for $format for " + s"the partition column: $partitionColumn. It must be specified manually.") @@ -182,7 +184,7 @@ case class DataSource( format.inferSchema( sparkSession, caseInsensitiveOptions, - tempFileCatalog.allFiles()) + tempFileIndex.allFiles()) }.getOrElse { throw new AnalysisException( s"Unable to infer schema for $format. It must be specified manually.") @@ -224,8 +226,11 @@ case class DataSource( "you may be able to create a static DataFrame on that directory with " + "'spark.read.load(directory)' and infer schema from it.") } - val (schema, partCols) = getOrInferFileFormatSchema(format) - SourceInfo(s"FileSource[$path]", StructType(schema ++ partCols), partCols.fieldNames) + val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format) + SourceInfo( + s"FileSource[$path]", + StructType(dataSchema ++ partitionSchema), + partitionSchema.fieldNames) case _ => throw new UnsupportedOperationException( @@ -379,7 +384,7 @@ case class DataSource( globPath }.toArray - val (dataSchema, inferredPartitionSchema) = getOrInferFileFormatSchema(format) + val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format) val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions && catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) { @@ -388,12 +393,12 @@ case class DataSource( catalogTable.get, catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L)) } else { - new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(inferredPartitionSchema)) + new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema)) } HadoopFsRelation( fileCatalog, - partitionSchema = inferredPartitionSchema, + partitionSchema = partitionSchema, dataSchema = dataSchema.asNullable, bucketSpec = bucketSpec, format, -- GitLab