From 9281a3d504d526440c1d445075e38a6d9142ac93 Mon Sep 17 00:00:00 2001 From: hyukjinkwon <gurwls223@gmail.com> Date: Wed, 22 Mar 2017 08:41:46 +0800 Subject: [PATCH] [SPARK-19919][SQL] Defer throwing the exception for empty paths in CSV datasource into `DataSource` ## What changes were proposed in this pull request? This PR proposes to defer throwing the exception within `DataSource`. Currently, if other datasources fail to infer the schema, it returns `None` and then this is being validated in `DataSource` as below: ``` scala> spark.read.json("emptydir") org.apache.spark.sql.AnalysisException: Unable to infer schema for JSON. It must be specified manually.; ``` ``` scala> spark.read.orc("emptydir") org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC. It must be specified manually.; ``` ``` scala> spark.read.parquet("emptydir") org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.; ``` However, CSV it checks it within the datasource implementation and throws another exception message as below: ``` scala> spark.read.csv("emptydir") java.lang.IllegalArgumentException: requirement failed: Cannot infer schema from an empty set of files ``` We could remove this duplicated check and validate this in one place in the same way with the same message. ## How was this patch tested? Unit test in `CSVSuite` and manual test. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17256 from HyukjinKwon/SPARK-19919. --- .../datasources/csv/CSVDataSource.scala | 25 +++++++++++++------ .../datasources/csv/CSVFileFormat.scala | 4 +-- .../sql/test/DataFrameReaderWriterSuite.scala | 6 +++-- 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 63af18ec5b..83bdf6fe22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -54,10 +54,21 @@ abstract class CSVDataSource extends Serializable { /** * Infers the schema from `inputPaths` files. */ - def infer( + final def inferSchema( sparkSession: SparkSession, inputPaths: Seq[FileStatus], - parsedOptions: CSVOptions): Option[StructType] + parsedOptions: CSVOptions): Option[StructType] = { + if (inputPaths.nonEmpty) { + Some(infer(sparkSession, inputPaths, parsedOptions)) + } else { + None + } + } + + protected def infer( + sparkSession: SparkSession, + inputPaths: Seq[FileStatus], + parsedOptions: CSVOptions): StructType /** * Generates a header from the given row which is null-safe and duplicate-safe. @@ -131,10 +142,10 @@ object TextInputCSVDataSource extends CSVDataSource { override def infer( sparkSession: SparkSession, inputPaths: Seq[FileStatus], - parsedOptions: CSVOptions): Option[StructType] = { + parsedOptions: CSVOptions): StructType = { val csv = createBaseDataset(sparkSession, inputPaths, parsedOptions) val maybeFirstLine = CSVUtils.filterCommentAndEmpty(csv, parsedOptions).take(1).headOption - Some(inferFromDataset(sparkSession, csv, maybeFirstLine, parsedOptions)) + inferFromDataset(sparkSession, csv, maybeFirstLine, parsedOptions) } /** @@ -203,7 +214,7 @@ object WholeFileCSVDataSource extends CSVDataSource { override def infer( sparkSession: SparkSession, inputPaths: Seq[FileStatus], - parsedOptions: CSVOptions): Option[StructType] = { + parsedOptions: CSVOptions): StructType = { val csv = createBaseRdd(sparkSession, inputPaths, parsedOptions) csv.flatMap { lines => UnivocityParser.tokenizeStream( @@ -222,10 +233,10 @@ object WholeFileCSVDataSource extends CSVDataSource { parsedOptions.headerFlag, new CsvParser(parsedOptions.asParserSettings)) } - Some(CSVInferSchema.infer(tokenRDD, header, parsedOptions)) + CSVInferSchema.infer(tokenRDD, header, parsedOptions) case None => // If the first row could not be read, just return the empty schema. - Some(StructType(Nil)) + StructType(Nil) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index eef43c7629..a99bdfee5d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -51,12 +51,10 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { - require(files.nonEmpty, "Cannot infer schema from an empty set of files") - val parsedOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) - CSVDataSource(parsedOptions).infer(sparkSession, files, parsedOptions) + CSVDataSource(parsedOptions).inferSchema(sparkSession, files, parsedOptions) } override def prepareWrite( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 8a8ba05534..8287776f8f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -370,9 +370,11 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be val schema = df.schema // Reader, without user specified schema - intercept[IllegalArgumentException] { + val message = intercept[AnalysisException] { testRead(spark.read.csv(), Seq.empty, schema) - } + }.getMessage + assert(message.contains("Unable to infer schema for CSV. It must be specified manually.")) + testRead(spark.read.csv(dir), data, schema) testRead(spark.read.csv(dir, dir), data ++ data, schema) testRead(spark.read.csv(Seq(dir, dir): _*), data ++ data, schema) -- GitLab