diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 23a707108608718fc57276594566802c35583bb3..0dfe7dba1e5c8442ba1119c4b3ad203993c062f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -186,7 +186,8 @@ case class DataSource( userSpecifiedSchema = Some(dataSchema), className = className, options = - new CaseInsensitiveMap(options.filterKeys(_ != "path"))).resolveRelation())) + new CaseInsensitiveMap( + options.filterKeys(_ != "path") + ("basePath" -> path))).resolveRelation())) } new FileStreamSource( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 73d1b1b1d507d2cfa8387285063d8e9f977f6a4c..64cddf0deecb0eb229907890c03110c8676eb124 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -281,6 +281,30 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { Utils.deleteRecursively(tmp) } + + test("reading from json files inside partitioned directory") { + val src = { + val base = Utils.createTempDir(namePrefix = "streaming.src") + new File(base, "type=X") + } + val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") + src.mkdirs() + + + // Add a file so that we can infer its schema + stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}") + + val textSource = createFileStreamSource("json", src.getCanonicalPath) + + // FileStreamSource should infer the column "c" + val filtered = textSource.toDF().filter($"c" contains "keep") + + testStream(filtered)( + AddTextFileData(textSource, "{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp), + CheckAnswer("keep2", "keep3", "keep5", "keep6") + ) + } + test("read from parquet files") { val src = Utils.createTempDir(namePrefix = "streaming.src") val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")