Skip to content
Snippets Groups Projects
Commit fc02ef95 authored by Shixiong Zhu's avatar Shixiong Zhu
Browse files

[SPARK-19603][SS] Fix StreamingQuery explain command

## What changes were proposed in this pull request?

`StreamingQuery.explain` doesn't show the correct streaming physical plan right now because `ExplainCommand` receives a runtime batch plan and its `logicalPlan.isStreaming` is always false.

This PR adds `streaming` parameter to `ExplainCommand` to allow `StreamExecution` to specify that it's a streaming plan.

Examples of the explain outputs:

- streaming DataFrame.explain()
```
== Physical Plan ==
*HashAggregate(keys=[value#518], functions=[count(1)])
+- StateStoreSave [value#518], OperatorStateId(<unknown>,0,0), Append, 0
   +- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
      +- StateStoreRestore [value#518], OperatorStateId(<unknown>,0,0)
         +- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
            +- Exchange hashpartitioning(value#518, 5)
               +- *HashAggregate(keys=[value#518], functions=[partial_count(1)])
                  +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
                     +- *MapElements <function1>, obj#517: java.lang.String
                        +- *DeserializeToObject value#513.toString, obj#516: java.lang.String
                           +- StreamingRelation MemoryStream[value#513], [value#513]
```

- StreamingQuery.explain(extended = false)
```
== Physical Plan ==
*HashAggregate(keys=[value#518], functions=[count(1)])
+- StateStoreSave [value#518], OperatorStateId(...,0,0), Complete, 0
   +- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
      +- StateStoreRestore [value#518], OperatorStateId(...,0,0)
         +- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
            +- Exchange hashpartitioning(value#518, 5)
               +- *HashAggregate(keys=[value#518], functions=[partial_count(1)])
                  +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
                     +- *MapElements <function1>, obj#517: java.lang.String
                        +- *DeserializeToObject value#543.toString, obj#516: java.lang.String
                           +- LocalTableScan [value#543]
```

- StreamingQuery.explain(extended = true)
```
== Parsed Logical Plan ==
Aggregate [value#518], [value#518, count(1) AS count(1)#524L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
   +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String
      +- DeserializeToObject cast(value#543 as string).toString, obj#516: java.lang.String
         +- LocalRelation [value#543]

== Analyzed Logical Plan ==
value: string, count(1): bigint
Aggregate [value#518], [value#518, count(1) AS count(1)#524L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
   +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String
      +- DeserializeToObject cast(value#543 as string).toString, obj#516: java.lang.String
         +- LocalRelation [value#543]

== Optimized Logical Plan ==
Aggregate [value#518], [value#518, count(1) AS count(1)#524L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
   +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String
      +- DeserializeToObject value#543.toString, obj#516: java.lang.String
         +- LocalRelation [value#543]

== Physical Plan ==
*HashAggregate(keys=[value#518], functions=[count(1)], output=[value#518, count(1)#524L])
+- StateStoreSave [value#518], OperatorStateId(...,0,0), Complete, 0
   +- *HashAggregate(keys=[value#518], functions=[merge_count(1)], output=[value#518, count#530L])
      +- StateStoreRestore [value#518], OperatorStateId(...,0,0)
         +- *HashAggregate(keys=[value#518], functions=[merge_count(1)], output=[value#518, count#530L])
            +- Exchange hashpartitioning(value#518, 5)
               +- *HashAggregate(keys=[value#518], functions=[partial_count(1)], output=[value#518, count#530L])
                  +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
                     +- *MapElements <function1>, obj#517: java.lang.String
                        +- *DeserializeToObject value#543.toString, obj#516: java.lang.String
                           +- LocalTableScan [value#543]
```

## How was this patch tested?

The updated unit test.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16934 from zsxwing/SPARK-19603.
parent 08c1972a
No related branches found
No related tags found
No related merge requests found
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment