From 4b88067416ce922ae15a1445cf953fb9b5c43427 Mon Sep 17 00:00:00 2001
From: Takuya UESHIN <ueshin@happy-camper.st>
Date: Wed, 25 May 2016 12:02:07 -0700
Subject: [PATCH] [SPARK-15483][SQL] IncrementalExecution should use extra
 strategies.

## What changes were proposed in this pull request?

Extra strategies does not work for streams because `IncrementalExecution` uses modified planner with stateful operations but it does not include extra strategies.

This pr fixes `IncrementalExecution` to include extra strategies to use them.

## How was this patch tested?

I added a test to check if extra strategies work for streams.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #13261 from ueshin/issues/SPARK-15483.
---
 .../streaming/IncrementalExecution.scala          |  3 ++-
 .../apache/spark/sql/streaming/StreamSuite.scala  | 15 +++++++++++++++
 2 files changed, 17 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 8b96f65bc3..fe5f36e1cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -36,7 +36,8 @@ class IncrementalExecution private[sql](
   extends QueryExecution(sparkSession, logicalPlan) {
 
   // TODO: make this always part of planning.
-  val stateStrategy = sparkSession.sessionState.planner.StatefulAggregationStrategy :: Nil
+  val stateStrategy = sparkSession.sessionState.planner.StatefulAggregationStrategy +:
+    sparkSession.sessionState.experimentalMethods.extraStrategies
 
   // Modified planner with stateful operations.
   override def planner: SparkPlanner =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index b742206b58..ae89a6887a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -220,6 +220,21 @@ class StreamSuite extends StreamTest with SharedSQLContext {
       CheckOffsetLogLatestBatchId(2),
       CheckSinkLatestBatchId(2))
   }
+
+  test("insert an extraStrategy") {
+    try {
+      spark.experimental.extraStrategies = TestStrategy :: Nil
+
+      val inputData = MemoryStream[(String, Int)]
+      val df = inputData.toDS().map(_._1).toDF("a")
+
+      testStream(df)(
+        AddData(inputData, ("so slow", 1)),
+        CheckAnswer("so fast"))
+    } finally {
+      spark.experimental.extraStrategies = Nil
+    }
+  }
 }
 
 /**
-- 
GitLab