Skip to content
Snippets Groups Projects
Commit 1ef6b296 authored by Tathagata Das's avatar Tathagata Das Committed by Shixiong Zhu
Browse files

[SPARK-18671][SS][TEST] Added tests to ensure stability of that all Structured...

[SPARK-18671][SS][TEST] Added tests to ensure stability of that all Structured Streaming log formats

## What changes were proposed in this pull request?

To be able to restart StreamingQueries across Spark version, we have already made the logs (offset log, file source log, file sink log) use json. We should added tests with actual json files in the Spark such that any incompatible changes in reading the logs is immediately caught. This PR add tests for FileStreamSourceLog, FileStreamSinkLog, and OffsetSeqLog.

## How was this patch tested?
new unit tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #16128 from tdas/SPARK-18671.
parent cb1f10b4
No related branches found
No related tags found
No related merge requests found
Showing
with 114 additions and 3 deletions
...@@ -102,3 +102,4 @@ org.apache.spark.scheduler.ExternalClusterManager ...@@ -102,3 +102,4 @@ org.apache.spark.scheduler.ExternalClusterManager
.Rbuildignore .Rbuildignore
org.apache.spark.deploy.yarn.security.ServiceCredentialProvider org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
spark-warehouse spark-warehouse
structured-streaming/*
...@@ -81,7 +81,14 @@ private object JsonUtils { ...@@ -81,7 +81,14 @@ private object JsonUtils {
*/ */
def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = { def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = {
val result = new HashMap[String, HashMap[Int, Long]]() val result = new HashMap[String, HashMap[Int, Long]]()
partitionOffsets.foreach { case (tp, off) => implicit val ordering = new Ordering[TopicPartition] {
override def compare(x: TopicPartition, y: TopicPartition): Int = {
Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
}
}
val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism
partitions.foreach { tp =>
val off = partitionOffsets(tp)
val parts = result.getOrElse(tp.topic, new HashMap[Int, Long]) val parts = result.getOrElse(tp.topic, new HashMap[Int, Long])
parts += tp.partition -> off parts += tp.partition -> off
result += tp.topic -> parts result += tp.topic -> parts
......
...@@ -89,4 +89,16 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext { ...@@ -89,4 +89,16 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext {
Array(0 -> batch0Serialized, 1 -> batch1Serialized)) Array(0 -> batch0Serialized, 1 -> batch1Serialized))
} }
} }
test("read Spark 2.1.0 log format") {
val offset = readFromResource("kafka-source-offset-version-2.1.0.txt")
assert(KafkaSourceOffset(offset) ===
KafkaSourceOffset(("topic1", 0, 456L), ("topic1", 1, 789L), ("topic2", 0, 0L)))
}
private def readFromResource(file: String): SerializedOffset = {
import scala.io.Source
val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString
SerializedOffset(str)
}
} }
v1
{"path":"/a/b/0","size":1,"isDir":false,"modificationTime":1,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/1","size":100,"isDir":false,"modificationTime":100,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/2","size":200,"isDir":false,"modificationTime":200,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/3","size":300,"isDir":false,"modificationTime":300,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/4","size":400,"isDir":false,"modificationTime":400,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/5","size":500,"isDir":false,"modificationTime":500,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/6","size":600,"isDir":false,"modificationTime":600,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/7","size":700,"isDir":false,"modificationTime":700,"blockReplication":1,"blockSize":100,"action":"add"}
v1
{"path":"/a/b/8","size":800,"isDir":false,"modificationTime":800,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/0","size":100,"isDir":false,"modificationTime":100,"blockReplication":1,"blockSize":100,"action":"delete"}
v1
{"path":"/a/b/9","size":900,"isDir":false,"modificationTime":900,"blockReplication":3,"blockSize":200,"action":"add"}
v1
{"path":"/a/b/0","timestamp":1480730949000,"batchId":0}
{"path":"/a/b/1","timestamp":1480730950000,"batchId":1}
{"path":"/a/b/2","timestamp":1480730950000,"batchId":2}
v1
{"path":"/a/b/3","timestamp":1480730950000,"batchId":3}
v1
{"path":"/a/b/4","timestamp":1480730951000,"batchId":4}
{"topic1":{"0":456,"1":789},"topic2":{"0":0}}
v1
{"batchWatermarkMs":0,"batchTimestampMs":1480981499528}
0
{"topic-0":{"0":1}}
\ No newline at end of file
...@@ -185,6 +185,21 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { ...@@ -185,6 +185,21 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
} }
} }
test("read Spark 2.1.0 log format") {
assert(readFromResource("file-sink-log-version-2.1.0") === Seq(
// SinkFileStatus("/a/b/0", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), -> deleted
SinkFileStatus("/a/b/1", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/2", 200, false, 200, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/3", 300, false, 300, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/4", 400, false, 400, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/5", 500, false, 500, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/6", 600, false, 600, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/7", 700, false, 700, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/8", 800, false, 800, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/9", 900, false, 900, 3, 200, FileStreamSinkLog.ADD_ACTION)
))
}
/** /**
* Create a fake SinkFileStatus using path and action. Most of tests don't care about other fields * Create a fake SinkFileStatus using path and action. Most of tests don't care about other fields
* in SinkFileStatus. * in SinkFileStatus.
...@@ -206,4 +221,10 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { ...@@ -206,4 +221,10 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
f(sinkLog) f(sinkLog)
} }
} }
private def readFromResource(dir: String): Seq[SinkFileStatus] = {
val input = getClass.getResource(s"/structured-streaming/$dir")
val log = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, input.toString)
log.allFiles()
}
} }
...@@ -69,4 +69,20 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { ...@@ -69,4 +69,20 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
Array(0 -> batch0Serialized, 1 -> batch1Serialized)) Array(0 -> batch0Serialized, 1 -> batch1Serialized))
} }
} }
test("read Spark 2.1.0 log format") {
val (batchId, offsetSeq) = readFromResource("offset-log-version-2.1.0")
assert(batchId === 0)
assert(offsetSeq.offsets === Seq(
Some(SerializedOffset("0")),
Some(SerializedOffset("""{"topic-0":{"0":1}}"""))
))
assert(offsetSeq.metadata === Some(OffsetSeqMetadata(0L, 1480981499528L)))
}
private def readFromResource(dir: String): (Long, OffsetSeq) = {
val input = getClass.getResource(s"/structured-streaming/$dir")
val log = new OffsetSeqLog(spark, input.toString)
log.getLatest().get
}
} }
...@@ -19,14 +19,13 @@ package org.apache.spark.sql.streaming ...@@ -19,14 +19,13 @@ package org.apache.spark.sql.streaming
import java.io.File import java.io.File
import scala.collection.mutable
import org.scalatest.PrivateMethodTester import org.scalatest.PrivateMethodTester
import org.scalatest.time.SpanSugar._ import org.scalatest.time.SpanSugar._
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
...@@ -1022,6 +1021,33 @@ class FileStreamSourceSuite extends FileStreamSourceTest { ...@@ -1022,6 +1021,33 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
val options = new FileStreamOptions(Map("maxfilespertrigger" -> "1")) val options = new FileStreamOptions(Map("maxfilespertrigger" -> "1"))
assert(options.maxFilesPerTrigger == Some(1)) assert(options.maxFilesPerTrigger == Some(1))
} }
test("FileStreamSource offset - read Spark 2.1.0 log format") {
val offset = readOffsetFromResource("file-source-offset-version-2.1.0.txt")
assert(LongOffset.convert(offset) === Some(LongOffset(345)))
}
test("FileStreamSourceLog - read Spark 2.1.0 log format") {
assert(readLogFromResource("file-source-log-version-2.1.0") === Seq(
FileEntry("/a/b/0", 1480730949000L, 0L),
FileEntry("/a/b/1", 1480730950000L, 1L),
FileEntry("/a/b/2", 1480730950000L, 2L),
FileEntry("/a/b/3", 1480730950000L, 3L),
FileEntry("/a/b/4", 1480730951000L, 4L)
))
}
private def readLogFromResource(dir: String): Seq[FileEntry] = {
val input = getClass.getResource(s"/structured-streaming/$dir")
val log = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, input.toString)
log.allFiles()
}
private def readOffsetFromResource(file: String): SerializedOffset = {
import scala.io.Source
val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString
SerializedOffset(str.trim)
}
} }
class FileStreamSourceStressTestSuite extends FileStreamSourceTest { class FileStreamSourceStressTestSuite extends FileStreamSourceTest {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment