Skip to content
Snippets Groups Projects
Commit e68aed70 authored by Burak Yavuz's avatar Burak Yavuz Committed by Shixiong Zhu
Browse files

[SPARK-21216][SS] Hive strategies missed in Structured Streaming IncrementalExecution

## What changes were proposed in this pull request?

If someone creates a HiveSession, the planner in `IncrementalExecution` doesn't take into account the Hive scan strategies. This causes joins of Streaming DataFrame's with Hive tables to fail.

## How was this patch tested?

Regression test

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #18426 from brkyvz/hive-join.
parent 838effb9
No related branches found
No related tags found
No related merge requests found
......@@ -47,6 +47,10 @@ class IncrementalExecution(
sparkSession.sparkContext,
sparkSession.sessionState.conf,
sparkSession.sessionState.experimentalMethods) {
override def strategies: Seq[Strategy] =
extraPlanningStrategies ++
sparkSession.sessionState.planner.strategies
override def extraPlanningStrategies: Seq[Strategy] =
StatefulAggregationStrategy ::
FlatMapGroupsWithStateStrategy ::
......
......@@ -160,7 +160,6 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
test("drop table") {
testDropTable(isDatasourceTable = false)
}
}
class HiveDDLSuite
......@@ -1956,4 +1955,44 @@ class HiveDDLSuite
}
}
}
test("SPARK-21216: join with a streaming DataFrame") {
import org.apache.spark.sql.execution.streaming.MemoryStream
import testImplicits._
implicit val _sqlContext = spark.sqlContext
Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word").createOrReplaceTempView("t1")
// Make a table and ensure it will be broadcast.
sql("""CREATE TABLE smallTable(word string, number int)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
|STORED AS TEXTFILE
""".stripMargin)
sql(
"""INSERT INTO smallTable
|SELECT word, number from t1
""".stripMargin)
val inputData = MemoryStream[Int]
val joined = inputData.toDS().toDF()
.join(spark.table("smallTable"), $"value" === $"number")
val sq = joined.writeStream
.format("memory")
.queryName("t2")
.start()
try {
inputData.addData(1, 2)
sq.processAllAvailable()
checkAnswer(
spark.table("t2"),
Seq(Row(1, "one", 1), Row(2, "two", 2))
)
} finally {
sq.stop()
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment