Skip to content
Snippets Groups Projects
Commit abacf5f2 authored by Shixiong Zhu's avatar Shixiong Zhu Committed by Tathagata Das
Browse files

[HOTFIX][SQL] Don't stop ContinuousQuery in quietly

## What changes were proposed in this pull request?

Try to fix a flaky hang

## How was this patch tested?

Existing Jenkins test

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #11909 from zsxwing/hotfix2.
parent 926a93e5
No related branches found
No related tags found
No related merge requests found
...@@ -65,19 +65,6 @@ import org.apache.spark.util.Utils ...@@ -65,19 +65,6 @@ import org.apache.spark.util.Utils
*/ */
trait StreamTest extends QueryTest with Timeouts { trait StreamTest extends QueryTest with Timeouts {
implicit class RichContinuousQuery(cq: ContinuousQuery) {
def stopQuietly(): Unit = quietly {
try {
failAfter(10.seconds) {
cq.stop()
}
} catch {
case e: TestFailedDueToTimeoutException =>
logError(e.getMessage(), e)
}
}
}
implicit class RichSource(s: Source) { implicit class RichSource(s: Source) {
def toDF(): DataFrame = Dataset.newDataFrame(sqlContext, StreamingRelation(s)) def toDF(): DataFrame = Dataset.newDataFrame(sqlContext, StreamingRelation(s))
......
...@@ -72,7 +72,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B ...@@ -72,7 +72,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
private def newMetadataDir = Utils.createTempDir("streaming.metadata").getCanonicalPath private def newMetadataDir = Utils.createTempDir("streaming.metadata").getCanonicalPath
after { after {
sqlContext.streams.active.foreach(_.stopQuietly()) sqlContext.streams.active.foreach(_.stop())
} }
test("resolve default source") { test("resolve default source") {
...@@ -83,7 +83,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B ...@@ -83,7 +83,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
.format("org.apache.spark.sql.streaming.test") .format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir) .option("checkpointLocation", newMetadataDir)
.startStream() .startStream()
.stopQuietly() .stop()
} }
test("resolve full class") { test("resolve full class") {
...@@ -94,7 +94,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B ...@@ -94,7 +94,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
.format("org.apache.spark.sql.streaming.test") .format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir) .option("checkpointLocation", newMetadataDir)
.startStream() .startStream()
.stopQuietly() .stop()
} }
test("options") { test("options") {
...@@ -121,7 +121,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B ...@@ -121,7 +121,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
.options(map) .options(map)
.option("checkpointLocation", newMetadataDir) .option("checkpointLocation", newMetadataDir)
.startStream() .startStream()
.stopQuietly() .stop()
assert(LastOptions.parameters("opt1") == "1") assert(LastOptions.parameters("opt1") == "1")
assert(LastOptions.parameters("opt2") == "2") assert(LastOptions.parameters("opt2") == "2")
...@@ -137,7 +137,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B ...@@ -137,7 +137,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
.format("org.apache.spark.sql.streaming.test") .format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir) .option("checkpointLocation", newMetadataDir)
.startStream() .startStream()
.stopQuietly() .stop()
assert(LastOptions.partitionColumns == Nil) assert(LastOptions.partitionColumns == Nil)
df.write df.write
...@@ -145,7 +145,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B ...@@ -145,7 +145,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
.option("checkpointLocation", newMetadataDir) .option("checkpointLocation", newMetadataDir)
.partitionBy("a") .partitionBy("a")
.startStream() .startStream()
.stopQuietly() .stop()
assert(LastOptions.partitionColumns == Seq("a")) assert(LastOptions.partitionColumns == Seq("a"))
withSQLConf("spark.sql.caseSensitive" -> "false") { withSQLConf("spark.sql.caseSensitive" -> "false") {
...@@ -154,7 +154,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B ...@@ -154,7 +154,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
.option("checkpointLocation", newMetadataDir) .option("checkpointLocation", newMetadataDir)
.partitionBy("A") .partitionBy("A")
.startStream() .startStream()
.stopQuietly() .stop()
assert(LastOptions.partitionColumns == Seq("a")) assert(LastOptions.partitionColumns == Seq("a"))
} }
...@@ -164,7 +164,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B ...@@ -164,7 +164,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
.option("checkpointLocation", newMetadataDir) .option("checkpointLocation", newMetadataDir)
.partitionBy("b") .partitionBy("b")
.startStream() .startStream()
.stopQuietly() .stop()
} }
} }
...@@ -182,7 +182,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B ...@@ -182,7 +182,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
.format("org.apache.spark.sql.streaming.test") .format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir) .option("checkpointLocation", newMetadataDir)
.startStream("/test") .startStream("/test")
.stopQuietly() .stop()
assert(LastOptions.parameters("path") == "/test") assert(LastOptions.parameters("path") == "/test")
} }
...@@ -207,7 +207,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B ...@@ -207,7 +207,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
.option("doubleOpt", 6.7) .option("doubleOpt", 6.7)
.option("checkpointLocation", newMetadataDir) .option("checkpointLocation", newMetadataDir)
.startStream("/test") .startStream("/test")
.stopQuietly() .stop()
assert(LastOptions.parameters("intOpt") == "56") assert(LastOptions.parameters("intOpt") == "56")
assert(LastOptions.parameters("boolOpt") == "false") assert(LastOptions.parameters("boolOpt") == "false")
...@@ -269,9 +269,9 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B ...@@ -269,9 +269,9 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
} }
// Should be able to start query with that name after stopping the previous query // Should be able to start query with that name after stopping the previous query
q1.stopQuietly() q1.stop()
val q5 = startQueryWithName("name") val q5 = startQueryWithName("name")
assert(activeStreamNames.contains("name")) assert(activeStreamNames.contains("name"))
sqlContext.streams.active.foreach(_.stopQuietly()) sqlContext.streams.active.foreach(_.stop())
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment