diff --git a/dev/.rat-excludes b/dev/.rat-excludes index a3efddeaa515acc2b199c54033e7dc5cbef643f3..6be1c72bc6cfb29355e16959663aa430152e772c 100644 --- a/dev/.rat-excludes +++ b/dev/.rat-excludes @@ -102,3 +102,4 @@ org.apache.spark.scheduler.ExternalClusterManager .Rbuildignore org.apache.spark.deploy.yarn.security.ServiceCredentialProvider spark-warehouse +structured-streaming/* diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala index 13d717092a898498dacc1336caa1588948b9320f..868edb5dcdc0c83524f711c51ea0e3ea6169cf2c 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala @@ -81,7 +81,14 @@ private object JsonUtils { */ def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = { val result = new HashMap[String, HashMap[Int, Long]]() - partitionOffsets.foreach { case (tp, off) => + implicit val ordering = new Ordering[TopicPartition] { + override def compare(x: TopicPartition, y: TopicPartition): Int = { + Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition)) + } + } + val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism + partitions.foreach { tp => + val off = partitionOffsets(tp) val parts = result.getOrElse(tp.topic, new HashMap[Int, Long]) parts += tp.partition -> off result += tp.topic -> parts diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala index 881018fd95665788987424c223d70203864255df..c8326ffcc7ad43bca3f96ac8ddcb1019def76c76 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala @@ -89,4 +89,16 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext { Array(0 -> batch0Serialized, 1 -> batch1Serialized)) } } + + test("read Spark 2.1.0 log format") { + val offset = readFromResource("kafka-source-offset-version-2.1.0.txt") + assert(KafkaSourceOffset(offset) === + KafkaSourceOffset(("topic1", 0, 456L), ("topic1", 1, 789L), ("topic2", 0, 0L))) + } + + private def readFromResource(file: String): SerializedOffset = { + import scala.io.Source + val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString + SerializedOffset(str) + } } diff --git a/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/7.compact b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/7.compact new file mode 100644 index 0000000000000000000000000000000000000000..e1ec8a74f052c9c564f690eadba2c0bb275bf261 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/7.compact @@ -0,0 +1,9 @@ +v1 +{"path":"/a/b/0","size":1,"isDir":false,"modificationTime":1,"blockReplication":1,"blockSize":100,"action":"add"} +{"path":"/a/b/1","size":100,"isDir":false,"modificationTime":100,"blockReplication":1,"blockSize":100,"action":"add"} +{"path":"/a/b/2","size":200,"isDir":false,"modificationTime":200,"blockReplication":1,"blockSize":100,"action":"add"} +{"path":"/a/b/3","size":300,"isDir":false,"modificationTime":300,"blockReplication":1,"blockSize":100,"action":"add"} +{"path":"/a/b/4","size":400,"isDir":false,"modificationTime":400,"blockReplication":1,"blockSize":100,"action":"add"} +{"path":"/a/b/5","size":500,"isDir":false,"modificationTime":500,"blockReplication":1,"blockSize":100,"action":"add"} +{"path":"/a/b/6","size":600,"isDir":false,"modificationTime":600,"blockReplication":1,"blockSize":100,"action":"add"} +{"path":"/a/b/7","size":700,"isDir":false,"modificationTime":700,"blockReplication":1,"blockSize":100,"action":"add"} diff --git a/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/8 b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/8 new file mode 100644 index 0000000000000000000000000000000000000000..e7989804e8886b5aa5bbd235575283c83bc612c0 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/8 @@ -0,0 +1,3 @@ +v1 +{"path":"/a/b/8","size":800,"isDir":false,"modificationTime":800,"blockReplication":1,"blockSize":100,"action":"add"} +{"path":"/a/b/0","size":100,"isDir":false,"modificationTime":100,"blockReplication":1,"blockSize":100,"action":"delete"} diff --git a/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/9 b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/9 new file mode 100644 index 0000000000000000000000000000000000000000..42fb0ee4169226fe7724055ff06408fbd189af99 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/9 @@ -0,0 +1,2 @@ +v1 +{"path":"/a/b/9","size":900,"isDir":false,"modificationTime":900,"blockReplication":3,"blockSize":200,"action":"add"} diff --git a/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/2.compact b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/2.compact new file mode 100644 index 0000000000000000000000000000000000000000..95f78bb2620d4edd5502ea711b00cf2df04d469c --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/2.compact @@ -0,0 +1,4 @@ +v1 +{"path":"/a/b/0","timestamp":1480730949000,"batchId":0} +{"path":"/a/b/1","timestamp":1480730950000,"batchId":1} +{"path":"/a/b/2","timestamp":1480730950000,"batchId":2} diff --git a/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/3 b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/3 new file mode 100644 index 0000000000000000000000000000000000000000..2caa5972e42ebf5dcfbc30cf42b603992547ea07 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/3 @@ -0,0 +1,2 @@ +v1 +{"path":"/a/b/3","timestamp":1480730950000,"batchId":3} diff --git a/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/4 b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/4 new file mode 100644 index 0000000000000000000000000000000000000000..e54b943229880de3386e24b5fcb399a9938fe310 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/4 @@ -0,0 +1,2 @@ +v1 +{"path":"/a/b/4","timestamp":1480730951000,"batchId":4} diff --git a/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt new file mode 100644 index 0000000000000000000000000000000000000000..51b4008129ffee9f3298ba2e0ebd1071211109fd --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt @@ -0,0 +1 @@ +345 diff --git a/sql/core/src/test/resources/structured-streaming/kafka-source-offset-version-2.1.0.txt b/sql/core/src/test/resources/structured-streaming/kafka-source-offset-version-2.1.0.txt new file mode 100644 index 0000000000000000000000000000000000000000..6410031743d26ccc8bf7cd54b107dd2e0aac5ba0 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/kafka-source-offset-version-2.1.0.txt @@ -0,0 +1 @@ +{"topic1":{"0":456,"1":789},"topic2":{"0":0}} diff --git a/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0 b/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0 new file mode 100644 index 0000000000000000000000000000000000000000..fe5c1d44a6e26e66d3ee50494b43b64367d507dc --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0 @@ -0,0 +1,4 @@ +v1 +{"batchWatermarkMs":0,"batchTimestampMs":1480981499528} +0 +{"topic-0":{"0":1}} \ No newline at end of file diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index e046fee0c04d37f63ed0da038dcc0a62d54c352d..8a21b76e8f029b4d517ff02347662c0630540c2a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -185,6 +185,21 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { } } + test("read Spark 2.1.0 log format") { + assert(readFromResource("file-sink-log-version-2.1.0") === Seq( + // SinkFileStatus("/a/b/0", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), -> deleted + SinkFileStatus("/a/b/1", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), + SinkFileStatus("/a/b/2", 200, false, 200, 1, 100, FileStreamSinkLog.ADD_ACTION), + SinkFileStatus("/a/b/3", 300, false, 300, 1, 100, FileStreamSinkLog.ADD_ACTION), + SinkFileStatus("/a/b/4", 400, false, 400, 1, 100, FileStreamSinkLog.ADD_ACTION), + SinkFileStatus("/a/b/5", 500, false, 500, 1, 100, FileStreamSinkLog.ADD_ACTION), + SinkFileStatus("/a/b/6", 600, false, 600, 1, 100, FileStreamSinkLog.ADD_ACTION), + SinkFileStatus("/a/b/7", 700, false, 700, 1, 100, FileStreamSinkLog.ADD_ACTION), + SinkFileStatus("/a/b/8", 800, false, 800, 1, 100, FileStreamSinkLog.ADD_ACTION), + SinkFileStatus("/a/b/9", 900, false, 900, 3, 200, FileStreamSinkLog.ADD_ACTION) + )) + } + /** * Create a fake SinkFileStatus using path and action. Most of tests don't care about other fields * in SinkFileStatus. @@ -206,4 +221,10 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { f(sinkLog) } } + + private def readFromResource(dir: String): Seq[SinkFileStatus] = { + val input = getClass.getResource(s"/structured-streaming/$dir") + val log = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, input.toString) + log.allFiles() + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index d3a83ea0b922fb24eca657abc58b9bcd11d5262b..d139efaaf824f8521b307b2b50b8a41be9613eb8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -69,4 +69,20 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { Array(0 -> batch0Serialized, 1 -> batch1Serialized)) } } + + test("read Spark 2.1.0 log format") { + val (batchId, offsetSeq) = readFromResource("offset-log-version-2.1.0") + assert(batchId === 0) + assert(offsetSeq.offsets === Seq( + Some(SerializedOffset("0")), + Some(SerializedOffset("""{"topic-0":{"0":1}}""")) + )) + assert(offsetSeq.metadata === Some(OffsetSeqMetadata(0L, 1480981499528L))) + } + + private def readFromResource(dir: String): (Long, OffsetSeq) = { + val input = getClass.getResource(s"/structured-streaming/$dir") + val log = new OffsetSeqLog(spark, input.toString) + log.getLatest().get + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 8256c63d87090dff7604f8dc824bf68d59bdcbbf..ff1f3e26f1593f8724de1d1fda65b57beccd6f12 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -19,14 +19,13 @@ package org.apache.spark.sql.streaming import java.io.File -import scala.collection.mutable - import org.scalatest.PrivateMethodTester import org.scalatest.time.SpanSugar._ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -1022,6 +1021,33 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val options = new FileStreamOptions(Map("maxfilespertrigger" -> "1")) assert(options.maxFilesPerTrigger == Some(1)) } + + test("FileStreamSource offset - read Spark 2.1.0 log format") { + val offset = readOffsetFromResource("file-source-offset-version-2.1.0.txt") + assert(LongOffset.convert(offset) === Some(LongOffset(345))) + } + + test("FileStreamSourceLog - read Spark 2.1.0 log format") { + assert(readLogFromResource("file-source-log-version-2.1.0") === Seq( + FileEntry("/a/b/0", 1480730949000L, 0L), + FileEntry("/a/b/1", 1480730950000L, 1L), + FileEntry("/a/b/2", 1480730950000L, 2L), + FileEntry("/a/b/3", 1480730950000L, 3L), + FileEntry("/a/b/4", 1480730951000L, 4L) + )) + } + + private def readLogFromResource(dir: String): Seq[FileEntry] = { + val input = getClass.getResource(s"/structured-streaming/$dir") + val log = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, input.toString) + log.allFiles() + } + + private def readOffsetFromResource(file: String): SerializedOffset = { + import scala.io.Source + val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString + SerializedOffset(str.trim) + } } class FileStreamSourceStressTestSuite extends FileStreamSourceTest {