diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index bb1793d451dfd78bc88aebcbaefcb32f04028e88..90c71cc6cfab79394175eb43f5af221121f9691b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -232,6 +232,10 @@ class SparkHadoopUtil extends Logging { recurse(baseStatus) } + def isGlobPath(pattern: Path): Boolean = { + pattern.toString.exists("{}[]*?\\".toSet.contains) + } + def globPath(pattern: Path): Seq[Path] = { val fs = pattern.getFileSystem(conf) Option(fs.globStatus(pattern)).map { statuses => @@ -240,11 +244,7 @@ class SparkHadoopUtil extends Logging { } def globPathIfNecessary(pattern: Path): Seq[Path] = { - if (pattern.toString.exists("{}[]*?\\".toSet.contains)) { - globPath(pattern) - } else { - Seq(pattern) - } + if (isGlobPath(pattern)) globPath(pattern) else Seq(pattern) } /** diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index bffe398247ba9755cb27db241da023650ecdb3a9..8bac347e13084bc2cb2f392220ae1fab881c7b84 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -315,7 +315,7 @@ class DataStreamReader(OptionUtils): >>> json_sdf = spark.readStream.format("json")\ .schema(sdf_schema)\ - .load(os.path.join(tempfile.mkdtemp(),'data')) + .load(tempfile.mkdtemp()) >>> json_sdf.isStreaming True >>> json_sdf.schema == sdf_schema @@ -382,8 +382,7 @@ class DataStreamReader(OptionUtils): it uses the value specified in ``spark.sql.columnNameOfCorruptRecord``. - >>> json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'), \ - schema = sdf_schema) + >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema) >>> json_sdf.isStreaming True >>> json_sdf.schema == sdf_schema @@ -411,8 +410,7 @@ class DataStreamReader(OptionUtils): .. note:: Experimental. - >>> parquet_sdf = spark.readStream.schema(sdf_schema)\ - .parquet(os.path.join(tempfile.mkdtemp())) + >>> parquet_sdf = spark.readStream.schema(sdf_schema).parquet(tempfile.mkdtemp()) >>> parquet_sdf.isStreaming True >>> parquet_sdf.schema == sdf_schema @@ -512,8 +510,7 @@ class DataStreamReader(OptionUtils): * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. - >>> csv_sdf = spark.readStream.csv(os.path.join(tempfile.mkdtemp(), 'data'), \ - schema = sdf_schema) + >>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema) >>> csv_sdf.isStreaming True >>> csv_sdf.schema == sdf_schema diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index a4110d7b11473cef52bc0364fd05daf691073014..6dc27c19521ea21b2cd8879a38b3a723d9c58888 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -203,6 +203,18 @@ case class DataSource( val path = caseInsensitiveOptions.getOrElse("path", { throw new IllegalArgumentException("'path' is not specified") }) + + // Check whether the path exists if it is not a glob pattern. + // For glob pattern, we do not check it because the glob pattern might only make sense + // once the streaming job starts and some upstream source starts dropping data. + val hdfsPath = new Path(path) + if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) { + val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + if (!fs.exists(hdfsPath)) { + throw new AnalysisException(s"Path does not exist: $path") + } + } + val isSchemaInferenceEnabled = sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE) val isTextSource = providingClass == classOf[text.TextFileFormat] // If the schema inference is disabled, only text sources require schema to be specified diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 6c04846f00e8b71217f62825462aabf750e418e2..8a34cf95f918dde1cd3bdb49270b388453ef18c1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -179,18 +179,24 @@ class FileStreamSourceSuite extends FileStreamSourceTest { withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { testError() } } - test("FileStreamSource schema: path doesn't exist, no schema") { - val e = intercept[IllegalArgumentException] { - createFileStreamSourceAndGetSchema(format = None, path = Some("/a/b/c"), schema = None) + test("FileStreamSource schema: path doesn't exist (without schema) should throw exception") { + withTempDir { dir => + intercept[AnalysisException] { + val userSchema = new StructType().add(new StructField("value", IntegerType)) + val schema = createFileStreamSourceAndGetSchema( + format = None, path = Some(new File(dir, "1").getAbsolutePath), schema = None) + } } - assert(e.getMessage.toLowerCase.contains("schema")) // reason is schema absence, not the path } - test("FileStreamSource schema: path doesn't exist, with schema") { - val userSchema = new StructType().add(new StructField("value", IntegerType)) - val schema = createFileStreamSourceAndGetSchema( - format = None, path = Some("/a/b/c"), schema = Some(userSchema)) - assert(schema === userSchema) + test("FileStreamSource schema: path doesn't exist (with schema) should throw exception") { + withTempDir { dir => + intercept[AnalysisException] { + val userSchema = new StructType().add(new StructField("value", IntegerType)) + val schema = createFileStreamSourceAndGetSchema( + format = None, path = Some(new File(dir, "1").getAbsolutePath), schema = Some(userSchema)) + } + } } @@ -225,20 +231,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest { // =============== Parquet file stream schema tests ================ - ignore("FileStreamSource schema: parquet, no existing files, no schema") { - withTempDir { src => - withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { - val e = intercept[AnalysisException] { - createFileStreamSourceAndGetSchema( - format = Some("parquet"), - path = Some(new File(src, "1").getCanonicalPath), - schema = None) - } - assert("Unable to infer schema. It must be specified manually.;" === e.getMessage) - } - } - } - test("FileStreamSource schema: parquet, existing files, no schema") { withTempDir { src => Seq("a", "b", "c").toDS().as("userColumn").toDF().write