-
- Downloads
[SPARK-15443][SQL] Fix 'explain' for streaming Dataset
## What changes were proposed in this pull request? - Fix the `explain` command for streaming Dataset/DataFrame. E.g., ``` == Parsed Logical Plan == 'SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7] +- 'MapElements <function1>, obj#6: java.lang.String +- 'DeserializeToObject unresolveddeserializer(createexternalrow(getcolumnbyordinal(0, StringType).toString, StructField(value,StringType,true))), obj#5: org.apache.spark.sql.Row +- Filter <function1>.apply +- StreamingRelation FileSource[/Users/zsx/stream], [value#0] == Analyzed Logical Plan == value: string SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7] +- MapElements <function1>, obj#6: java.lang.String +- DeserializeToObject createexternalrow(value#0.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row +- Filter <function1>.apply +- StreamingRelation FileSource[/Users/zsx/stream], [value#0] == Optimized Logical Plan == SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7] +- MapElements <function1>, obj#6: java.lang.String +- DeserializeToObject createexternalrow(value#0.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row +- Filter <function1>.apply +- StreamingRelation FileSource[/Users/zsx/stream], [value#0] == Physical Plan == *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7] +- *MapElements <function1>, obj#6: java.lang.String +- *DeserializeToObject createexternalrow(value#0.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row +- *Filter <function1>.apply +- StreamingRelation FileSource[/Users/zsx/stream], [value#0] ``` - Add `StreamingQuery.explain` to display the last execution plan. E.g., ``` == Parsed Logical Plan == SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7] +- MapElements <function1>, obj#6: java.lang.String +- DeserializeToObject createexternalrow(value#12.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row +- Filter <function1>.apply +- Relation[value#12] text == Analyzed Logical Plan == value: string SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7] +- MapElements <function1>, obj#6: java.lang.String +- DeserializeToObject createexternalrow(value#12.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row +- Filter <function1>.apply +- Relation[value#12] text == Optimized Logical Plan == SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7] +- MapElements <function1>, obj#6: java.lang.String +- DeserializeToObject createexternalrow(value#12.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row +- Filter <function1>.apply +- Relation[value#12] text == Physical Plan == *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7] +- *MapElements <function1>, obj#6: java.lang.String +- *DeserializeToObject createexternalrow(value#12.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row +- *Filter <function1>.apply +- *Scan text [value#12] Format: org.apache.spark.sql.execution.datasources.text.TextFileFormat1836ab91, InputPaths: file:/Users/zsx/stream/a.txt, file:/Users/zsx/stream/b.txt, file:/Users/zsx/stream/c.txt, PushedFilters: [], ReadSchema: struct<value:string> ``` ## How was this patch tested? The added unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #13815 from zsxwing/sdf-explain.
Showing
- sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 17 additions, 1 deletion...cala/org/apache/spark/sql/execution/SparkStrategies.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 10 additions, 1 deletion...ala/org/apache/spark/sql/execution/command/commands.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala 1 addition, 0 deletions.../spark/sql/execution/streaming/IncrementalExecution.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala 20 additions, 0 deletions...pache/spark/sql/execution/streaming/StreamExecution.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala 14 additions, 0 deletions...che/spark/sql/execution/streaming/StreamingRelation.scala
- sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala 15 additions, 0 deletions...scala/org/apache/spark/sql/streaming/StreamingQuery.scala
- sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala 36 additions, 0 deletions...rg/apache/spark/sql/streaming/FileStreamSourceSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 29 additions, 0 deletions...st/scala/org/apache/spark/sql/streaming/StreamSuite.scala
Loading
Please register or sign in to comment