diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index e5dd4d81d67797e7ed21c7a98208d55fb71a5be4..d0853f67b97ec9605caa3e9f0298fb78eba1fc15 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{CalendarIntervalType, StructType} import org.apache.spark.util.Utils @@ -186,6 +187,16 @@ case class DataSource( val path = caseInsensitiveOptions.getOrElse("path", { throw new IllegalArgumentException("'path' is not specified") }) + val isSchemaInferenceEnabled = sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE) + val isTextSource = providingClass == classOf[text.DefaultSource] + // If the schema inference is disabled, only text sources require schema to be specified + if (!isSchemaInferenceEnabled && !isTextSource && userSpecifiedSchema.isEmpty) { + throw new IllegalArgumentException( + "Schema must be specified when creating a streaming source DataFrame. " + + "If some files already exist in the directory, then depending on the file format " + + "you may be able to create a static DataFrame on that directory with " + + "'spark.read.load(directory)' and infer schema from it.") + } SourceInfo(s"FileSource[$path]", inferFileFormatSchema(format)) case _ => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f3064eb6ac6d6846a8bfa1498ef9f64b88932b79..b91518acce33a7cab2d3bb276e9ee4fd8ecd5ca6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -516,6 +516,13 @@ object SQLConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(60 * 1000L) // 10 minutes + val STREAMING_SCHEMA_INFERENCE = + SQLConfigBuilder("spark.sql.streaming.schemaInference") + .internal() + .doc("Whether file-based streaming sources will infer its own schema") + .booleanConf + .createWithDefault(false) + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index c97304c0ec1c1ef43b296f58ef844b709ffe815e..1d784f1f4ee8545d26839124101429c637db45c9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -23,6 +23,7 @@ import java.util.UUID import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -165,19 +166,36 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { .collect { case s @ StreamingRelation(dataSource, _, _) => s.schema }.head } + // ============= Basic parameter exists tests ================ + test("FileStreamSource schema: no path") { - val e = intercept[IllegalArgumentException] { - createFileStreamSourceAndGetSchema(format = None, path = None, schema = None) + def testError(): Unit = { + val e = intercept[IllegalArgumentException] { + createFileStreamSourceAndGetSchema(format = None, path = None, schema = None) + } + assert(e.getMessage.contains("path")) // reason is path, not schema } - assert("'path' is not specified" === e.getMessage) + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "false") { testError() } + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { testError() } } - test("FileStreamSource schema: path doesn't exist") { - intercept[AnalysisException] { + test("FileStreamSource schema: path doesn't exist, no schema") { + val e = intercept[IllegalArgumentException] { createFileStreamSourceAndGetSchema(format = None, path = Some("/a/b/c"), schema = None) } + assert(e.getMessage.toLowerCase.contains("schema")) // reason is schema absence, not the path + } + + test("FileStreamSource schema: path doesn't exist, with schema") { + val userSchema = new StructType().add(new StructField("value", IntegerType)) + val schema = createFileStreamSourceAndGetSchema( + format = None, path = Some("/a/b/c"), schema = Some(userSchema)) + assert(schema === userSchema) } + + // =============== Text file stream schema tests ================ + test("FileStreamSource schema: text, no existing files, no schema") { withTempDir { src => val schema = createFileStreamSourceAndGetSchema( @@ -205,13 +223,19 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } } + // =============== Parquet file stream schema tests ================ + test("FileStreamSource schema: parquet, no existing files, no schema") { withTempDir { src => - val e = intercept[AnalysisException] { - createFileStreamSourceAndGetSchema( - format = Some("parquet"), path = Some(new File(src, "1").getCanonicalPath), schema = None) + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { + val e = intercept[AnalysisException] { + createFileStreamSourceAndGetSchema( + format = Some("parquet"), + path = Some(new File(src, "1").getCanonicalPath), + schema = None) + } + assert("Unable to infer schema. It must be specified manually.;" === e.getMessage) } - assert("Unable to infer schema. It must be specified manually.;" === e.getMessage) } } @@ -220,9 +244,21 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { Seq("a", "b", "c").toDS().as("userColumn").toDF().write .mode(org.apache.spark.sql.SaveMode.Overwrite) .parquet(src.getCanonicalPath) - val schema = createFileStreamSourceAndGetSchema( - format = Some("parquet"), path = Some(src.getCanonicalPath), schema = None) - assert(schema === new StructType().add("value", StringType)) + + // Without schema inference, should throw error + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "false") { + intercept[IllegalArgumentException] { + createFileStreamSourceAndGetSchema( + format = Some("parquet"), path = Some(src.getCanonicalPath), schema = None) + } + } + + // With schema inference, should infer correct schema + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { + val schema = createFileStreamSourceAndGetSchema( + format = Some("parquet"), path = Some(src.getCanonicalPath), schema = None) + assert(schema === new StructType().add("value", StringType)) + } } } @@ -237,22 +273,39 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } } + // =============== JSON file stream schema tests ================ + test("FileStreamSource schema: json, no existing files, no schema") { withTempDir { src => - val e = intercept[AnalysisException] { - createFileStreamSourceAndGetSchema( - format = Some("json"), path = Some(src.getCanonicalPath), schema = None) + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { + + val e = intercept[AnalysisException] { + createFileStreamSourceAndGetSchema( + format = Some("json"), path = Some(src.getCanonicalPath), schema = None) + } + assert("Unable to infer schema. It must be specified manually.;" === e.getMessage) } - assert("Unable to infer schema. It must be specified manually.;" === e.getMessage) } } test("FileStreamSource schema: json, existing files, no schema") { withTempDir { src => - stringToFile(new File(src, "1"), "{'c': '1'}\n{'c': '2'}\n{'c': '3'}") - val schema = createFileStreamSourceAndGetSchema( - format = Some("json"), path = Some(src.getCanonicalPath), schema = None) - assert(schema === new StructType().add("c", StringType)) + + // Without schema inference, should throw error + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "false") { + intercept[IllegalArgumentException] { + createFileStreamSourceAndGetSchema( + format = Some("json"), path = Some(src.getCanonicalPath), schema = None) + } + } + + // With schema inference, should infer correct schema + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { + stringToFile(new File(src, "1"), "{'c': '1'}\n{'c': '2'}\n{'c': '3'}") + val schema = createFileStreamSourceAndGetSchema( + format = Some("json"), path = Some(src.getCanonicalPath), schema = None) + assert(schema === new StructType().add("c", StringType)) + } } } @@ -266,6 +319,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } } + // =============== Text file stream tests ================ + test("read from text files") { withTempDirs { case (src, tmp) => val textStream = createFileStream("text", src.getCanonicalPath) @@ -284,6 +339,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } } + // =============== JSON file stream tests ================ + test("read from json files") { withTempDirs { case (src, tmp) => val fileStream = createFileStream("json", src.getCanonicalPath, Some(valueSchema)) @@ -313,74 +370,82 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { test("read from json files with inferring schema") { withTempDirs { case (src, tmp) => + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { - // Add a file so that we can infer its schema - stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}") + // Add a file so that we can infer its schema + stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}") - val fileStream = createFileStream("json", src.getCanonicalPath) - assert(fileStream.schema === StructType(Seq(StructField("c", StringType)))) + val fileStream = createFileStream("json", src.getCanonicalPath) + assert(fileStream.schema === StructType(Seq(StructField("c", StringType)))) - // FileStreamSource should infer the column "c" - val filtered = fileStream.filter($"c" contains "keep") + // FileStreamSource should infer the column "c" + val filtered = fileStream.filter($"c" contains "keep") - testStream(filtered)( - AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp), - CheckAnswer("keep2", "keep3", "keep5", "keep6") - ) + testStream(filtered)( + AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp), + CheckAnswer("keep2", "keep3", "keep5", "keep6") + ) + } } } test("reading from json files inside partitioned directory") { withTempDirs { case (baseSrc, tmp) => - val src = new File(baseSrc, "type=X") - src.mkdirs() + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { + val src = new File(baseSrc, "type=X") + src.mkdirs() - // Add a file so that we can infer its schema - stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}") + // Add a file so that we can infer its schema + stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}") - val fileStream = createFileStream("json", src.getCanonicalPath) + val fileStream = createFileStream("json", src.getCanonicalPath) - // FileStreamSource should infer the column "c" - val filtered = fileStream.filter($"c" contains "keep") + // FileStreamSource should infer the column "c" + val filtered = fileStream.filter($"c" contains "keep") - testStream(filtered)( - AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp), - CheckAnswer("keep2", "keep3", "keep5", "keep6") - ) + testStream(filtered)( + AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp), + CheckAnswer("keep2", "keep3", "keep5", "keep6") + ) + } } } test("reading from json files with changing schema") { withTempDirs { case (src, tmp) => + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { - // Add a file so that we can infer its schema - stringToFile(new File(src, "existing"), "{'k': 'value0'}") + // Add a file so that we can infer its schema + stringToFile(new File(src, "existing"), "{'k': 'value0'}") - val fileStream = createFileStream("json", src.getCanonicalPath) + val fileStream = createFileStream("json", src.getCanonicalPath) - // FileStreamSource should infer the column "k" - assert(fileStream.schema === StructType(Seq(StructField("k", StringType)))) + // FileStreamSource should infer the column "k" + assert(fileStream.schema === StructType(Seq(StructField("k", StringType)))) - // After creating DF and before starting stream, add data with different schema - // Should not affect the inferred schema any more - stringToFile(new File(src, "existing2"), "{'k': 'value1', 'v': 'new'}") + // After creating DF and before starting stream, add data with different schema + // Should not affect the inferred schema any more + stringToFile(new File(src, "existing2"), "{'k': 'value1', 'v': 'new'}") - testStream(fileStream)( + testStream(fileStream)( - // Should not pick up column v in the file added before start - AddTextFileData("{'k': 'value2'}", src, tmp), - CheckAnswer("value0", "value1", "value2"), + // Should not pick up column v in the file added before start + AddTextFileData("{'k': 'value2'}", src, tmp), + CheckAnswer("value0", "value1", "value2"), - // Should read data in column k, and ignore v - AddTextFileData("{'k': 'value3', 'v': 'new'}", src, tmp), - CheckAnswer("value0", "value1", "value2", "value3"), + // Should read data in column k, and ignore v + AddTextFileData("{'k': 'value3', 'v': 'new'}", src, tmp), + CheckAnswer("value0", "value1", "value2", "value3"), - // Should ignore rows that do not have the necessary k column - AddTextFileData("{'v': 'value4'}", src, tmp), - CheckAnswer("value0", "value1", "value2", "value3", null)) + // Should ignore rows that do not have the necessary k column + AddTextFileData("{'v': 'value4'}", src, tmp), + CheckAnswer("value0", "value1", "value2", "value3", null)) + } } } + // =============== Parquet file stream tests ================ + test("read from parquet files") { withTempDirs { case (src, tmp) => val fileStream = createFileStream("parquet", src.getCanonicalPath, Some(valueSchema)) @@ -402,49 +467,39 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { test("read from parquet files with changing schema") { withTempDirs { case (src, tmp) => - // Add a file so that we can infer its schema - AddParquetFileData.writeToFile(Seq("value0").toDF("k"), src, tmp) - - val fileStream = createFileStream("parquet", src.getCanonicalPath) + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { - // FileStreamSource should infer the column "k" - assert(fileStream.schema === StructType(Seq(StructField("k", StringType)))) + // Add a file so that we can infer its schema + AddParquetFileData.writeToFile(Seq("value0").toDF("k"), src, tmp) - // After creating DF and before starting stream, add data with different schema - // Should not affect the inferred schema any more - AddParquetFileData.writeToFile(Seq(("value1", 0)).toDF("k", "v"), src, tmp) + val fileStream = createFileStream("parquet", src.getCanonicalPath) - testStream(fileStream)( - // Should not pick up column v in the file added before start - AddParquetFileData(Seq("value2").toDF("k"), src, tmp), - CheckAnswer("value0", "value1", "value2"), + // FileStreamSource should infer the column "k" + assert(fileStream.schema === StructType(Seq(StructField("k", StringType)))) - // Should read data in column k, and ignore v - AddParquetFileData(Seq(("value3", 1)).toDF("k", "v"), src, tmp), - CheckAnswer("value0", "value1", "value2", "value3"), + // After creating DF and before starting stream, add data with different schema + // Should not affect the inferred schema any more + AddParquetFileData.writeToFile(Seq(("value1", 0)).toDF("k", "v"), src, tmp) - // Should ignore rows that do not have the necessary k column - AddParquetFileData(Seq("value5").toDF("v"), src, tmp), - CheckAnswer("value0", "value1", "value2", "value3", null) - ) - } - } + testStream(fileStream)( + // Should not pick up column v in the file added before start + AddParquetFileData(Seq("value2").toDF("k"), src, tmp), + CheckAnswer("value0", "value1", "value2"), - test("file stream source without schema") { - withTempDir { src => - // Only "text" doesn't need a schema - createFileStream("text", src.getCanonicalPath) + // Should read data in column k, and ignore v + AddParquetFileData(Seq(("value3", 1)).toDF("k", "v"), src, tmp), + CheckAnswer("value0", "value1", "value2", "value3"), - // Both "json" and "parquet" require a schema if no existing file to infer - intercept[AnalysisException] { - createFileStream("json", src.getCanonicalPath) - } - intercept[AnalysisException] { - createFileStream("parquet", src.getCanonicalPath) + // Should ignore rows that do not have the necessary k column + AddParquetFileData(Seq("value5").toDF("v"), src, tmp), + CheckAnswer("value0", "value1", "value2", "value3", null) + ) } } } + // =============== file stream globbing tests ================ + test("read new files in nested directories with globbing") { withTempDirs { case (dir, tmp) => @@ -518,6 +573,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } } + // =============== other tests ================ + test("fault tolerance") { withTempDirs { case (src, tmp) => val fileStream = createFileStream("text", src.getCanonicalPath)