From aaa2a173a81868a92d61bcc9420961aaa7eaeb57 Mon Sep 17 00:00:00 2001 From: Liwei Lin <lwlin7@gmail.com> Date: Mon, 21 Nov 2016 21:14:13 -0800 Subject: [PATCH] [SPARK-18425][STRUCTURED STREAMING][TESTS] Test `CompactibleFileStreamLog` directly ## What changes were proposed in this pull request? Right now we are testing the most of `CompactibleFileStreamLog` in `FileStreamSinkLogSuite` (because `FileStreamSinkLog` once was the only subclass of `CompactibleFileStreamLog`, but now it's not the case any more). Let's refactor the tests so that `CompactibleFileStreamLog` is directly tested, making future changes (like https://github.com/apache/spark/pull/15828, https://github.com/apache/spark/pull/15827) to `CompactibleFileStreamLog` much easier to test and much easier to review. ## How was this patch tested? the PR itself is about tests Author: Liwei Lin <lwlin7@gmail.com> Closes #15870 from lw-lin/test-compact-1113. (cherry picked from commit ebeb0830a3a4837c7354a0eee667b9f5fad389c5) Signed-off-by: Shixiong Zhu <shixiong@databricks.com> --- .../CompactibleFileStreamLogSuite.scala | 216 +++++++++++++++++- .../streaming/FileStreamSinkLogSuite.scala | 68 ------ 2 files changed, 214 insertions(+), 70 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala index 2cd2157b29..e511fda579 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala @@ -17,12 +17,79 @@ package org.apache.spark.sql.execution.streaming -import org.apache.spark.SparkFunSuite +import java.io._ +import java.nio.charset.StandardCharsets._ -class CompactibleFileStreamLogSuite extends SparkFunSuite { +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.sql.execution.streaming.FakeFileSystem._ +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.test.SharedSQLContext + +class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext { + + /** To avoid caching of FS objects */ + override protected val sparkConf = + new SparkConf().set(s"spark.hadoop.fs.$scheme.impl.disable.cache", "true") import CompactibleFileStreamLog._ + /** -- testing of `object CompactibleFileStreamLog` begins -- */ + + test("getBatchIdFromFileName") { + assert(1234L === getBatchIdFromFileName("1234")) + assert(1234L === getBatchIdFromFileName("1234.compact")) + intercept[NumberFormatException] { + getBatchIdFromFileName("1234a") + } + } + + test("isCompactionBatch") { + assert(false === isCompactionBatch(0, compactInterval = 3)) + assert(false === isCompactionBatch(1, compactInterval = 3)) + assert(true === isCompactionBatch(2, compactInterval = 3)) + assert(false === isCompactionBatch(3, compactInterval = 3)) + assert(false === isCompactionBatch(4, compactInterval = 3)) + assert(true === isCompactionBatch(5, compactInterval = 3)) + } + + test("nextCompactionBatchId") { + assert(2 === nextCompactionBatchId(0, compactInterval = 3)) + assert(2 === nextCompactionBatchId(1, compactInterval = 3)) + assert(5 === nextCompactionBatchId(2, compactInterval = 3)) + assert(5 === nextCompactionBatchId(3, compactInterval = 3)) + assert(5 === nextCompactionBatchId(4, compactInterval = 3)) + assert(8 === nextCompactionBatchId(5, compactInterval = 3)) + } + + test("getValidBatchesBeforeCompactionBatch") { + intercept[AssertionError] { + getValidBatchesBeforeCompactionBatch(0, compactInterval = 3) + } + intercept[AssertionError] { + getValidBatchesBeforeCompactionBatch(1, compactInterval = 3) + } + assert(Seq(0, 1) === getValidBatchesBeforeCompactionBatch(2, compactInterval = 3)) + intercept[AssertionError] { + getValidBatchesBeforeCompactionBatch(3, compactInterval = 3) + } + intercept[AssertionError] { + getValidBatchesBeforeCompactionBatch(4, compactInterval = 3) + } + assert(Seq(2, 3, 4) === getValidBatchesBeforeCompactionBatch(5, compactInterval = 3)) + } + + test("getAllValidBatches") { + assert(Seq(0) === getAllValidBatches(0, compactInterval = 3)) + assert(Seq(0, 1) === getAllValidBatches(1, compactInterval = 3)) + assert(Seq(2) === getAllValidBatches(2, compactInterval = 3)) + assert(Seq(2, 3) === getAllValidBatches(3, compactInterval = 3)) + assert(Seq(2, 3, 4) === getAllValidBatches(4, compactInterval = 3)) + assert(Seq(5) === getAllValidBatches(5, compactInterval = 3)) + assert(Seq(5, 6) === getAllValidBatches(6, compactInterval = 3)) + assert(Seq(5, 6, 7) === getAllValidBatches(7, compactInterval = 3)) + assert(Seq(8) === getAllValidBatches(8, compactInterval = 3)) + } + test("deriveCompactInterval") { // latestCompactBatchId(4) + 1 <= default(5) // then use latestestCompactBatchId + 1 === 5 @@ -30,4 +97,149 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite { // First divisor of 10 greater than 4 === 5 assert(5 === deriveCompactInterval(4, 9)) } + + /** -- testing of `object CompactibleFileStreamLog` ends -- */ + + test("batchIdToPath") { + withFakeCompactibleFileStreamLog( + fileCleanupDelayMs = Long.MaxValue, + defaultCompactInterval = 3, + compactibleLog => { + assert("0" === compactibleLog.batchIdToPath(0).getName) + assert("1" === compactibleLog.batchIdToPath(1).getName) + assert("2.compact" === compactibleLog.batchIdToPath(2).getName) + assert("3" === compactibleLog.batchIdToPath(3).getName) + assert("4" === compactibleLog.batchIdToPath(4).getName) + assert("5.compact" === compactibleLog.batchIdToPath(5).getName) + }) + } + + test("serialize") { + withFakeCompactibleFileStreamLog( + fileCleanupDelayMs = Long.MaxValue, + defaultCompactInterval = 3, + compactibleLog => { + val logs = Array("entry_1", "entry_2", "entry_3") + val expected = s"""${FakeCompactibleFileStreamLog.VERSION} + |"entry_1" + |"entry_2" + |"entry_3"""".stripMargin + val baos = new ByteArrayOutputStream() + compactibleLog.serialize(logs, baos) + assert(expected === baos.toString(UTF_8.name())) + + baos.reset() + compactibleLog.serialize(Array(), baos) + assert(FakeCompactibleFileStreamLog.VERSION === baos.toString(UTF_8.name())) + }) + } + + test("deserialize") { + withFakeCompactibleFileStreamLog( + fileCleanupDelayMs = Long.MaxValue, + defaultCompactInterval = 3, + compactibleLog => { + val logs = s"""${FakeCompactibleFileStreamLog.VERSION} + |"entry_1" + |"entry_2" + |"entry_3"""".stripMargin + val expected = Array("entry_1", "entry_2", "entry_3") + assert(expected === + compactibleLog.deserialize(new ByteArrayInputStream(logs.getBytes(UTF_8)))) + + assert(Nil === + compactibleLog.deserialize( + new ByteArrayInputStream(FakeCompactibleFileStreamLog.VERSION.getBytes(UTF_8)))) + }) + } + + testWithUninterruptibleThread("compact") { + withFakeCompactibleFileStreamLog( + fileCleanupDelayMs = Long.MaxValue, + defaultCompactInterval = 3, + compactibleLog => { + for (batchId <- 0 to 10) { + compactibleLog.add(batchId, Array("some_path_" + batchId)) + val expectedFiles = (0 to batchId).map { id => "some_path_" + id } + assert(compactibleLog.allFiles() === expectedFiles) + if (isCompactionBatch(batchId, 3)) { + // Since batchId is a compaction batch, the batch log file should contain all logs + assert(compactibleLog.get(batchId).getOrElse(Nil) === expectedFiles) + } + } + }) + } + + testWithUninterruptibleThread("delete expired file") { + // Set `fileCleanupDelayMs` to 0 so that we can detect the deleting behaviour deterministically + withFakeCompactibleFileStreamLog( + fileCleanupDelayMs = 0, + defaultCompactInterval = 3, + compactibleLog => { + val fs = compactibleLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf()) + + def listBatchFiles(): Set[String] = { + fs.listStatus(compactibleLog.metadataPath).map(_.getPath.getName).filter { fileName => + try { + getBatchIdFromFileName(fileName) + true + } catch { + case _: NumberFormatException => false + } + }.toSet + } + + compactibleLog.add(0, Array("some_path_0")) + assert(Set("0") === listBatchFiles()) + compactibleLog.add(1, Array("some_path_1")) + assert(Set("0", "1") === listBatchFiles()) + compactibleLog.add(2, Array("some_path_2")) + assert(Set("2.compact") === listBatchFiles()) + compactibleLog.add(3, Array("some_path_3")) + assert(Set("2.compact", "3") === listBatchFiles()) + compactibleLog.add(4, Array("some_path_4")) + assert(Set("2.compact", "3", "4") === listBatchFiles()) + compactibleLog.add(5, Array("some_path_5")) + assert(Set("5.compact") === listBatchFiles()) + }) + } + + private def withFakeCompactibleFileStreamLog( + fileCleanupDelayMs: Long, + defaultCompactInterval: Int, + f: FakeCompactibleFileStreamLog => Unit + ): Unit = { + withTempDir { file => + val compactibleLog = new FakeCompactibleFileStreamLog( + fileCleanupDelayMs, + defaultCompactInterval, + spark, + file.getCanonicalPath) + f(compactibleLog) + } + } +} + +object FakeCompactibleFileStreamLog { + val VERSION = "test_version" +} + +class FakeCompactibleFileStreamLog( + _fileCleanupDelayMs: Long, + _defaultCompactInterval: Int, + sparkSession: SparkSession, + path: String) + extends CompactibleFileStreamLog[String]( + FakeCompactibleFileStreamLog.VERSION, + sparkSession, + path + ) { + + override protected def fileCleanupDelayMs: Long = _fileCleanupDelayMs + + override protected def isDeletingExpiredLog: Boolean = true + + override protected def defaultCompactInterval: Int = _defaultCompactInterval + + override def compactLogs(logs: Seq[String]): Seq[String] = logs } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index e1bc674a28..e046fee0c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -29,61 +29,6 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { import CompactibleFileStreamLog._ import FileStreamSinkLog._ - test("getBatchIdFromFileName") { - assert(1234L === getBatchIdFromFileName("1234")) - assert(1234L === getBatchIdFromFileName("1234.compact")) - intercept[NumberFormatException] { - getBatchIdFromFileName("1234a") - } - } - - test("isCompactionBatch") { - assert(false === isCompactionBatch(0, compactInterval = 3)) - assert(false === isCompactionBatch(1, compactInterval = 3)) - assert(true === isCompactionBatch(2, compactInterval = 3)) - assert(false === isCompactionBatch(3, compactInterval = 3)) - assert(false === isCompactionBatch(4, compactInterval = 3)) - assert(true === isCompactionBatch(5, compactInterval = 3)) - } - - test("nextCompactionBatchId") { - assert(2 === nextCompactionBatchId(0, compactInterval = 3)) - assert(2 === nextCompactionBatchId(1, compactInterval = 3)) - assert(5 === nextCompactionBatchId(2, compactInterval = 3)) - assert(5 === nextCompactionBatchId(3, compactInterval = 3)) - assert(5 === nextCompactionBatchId(4, compactInterval = 3)) - assert(8 === nextCompactionBatchId(5, compactInterval = 3)) - } - - test("getValidBatchesBeforeCompactionBatch") { - intercept[AssertionError] { - getValidBatchesBeforeCompactionBatch(0, compactInterval = 3) - } - intercept[AssertionError] { - getValidBatchesBeforeCompactionBatch(1, compactInterval = 3) - } - assert(Seq(0, 1) === getValidBatchesBeforeCompactionBatch(2, compactInterval = 3)) - intercept[AssertionError] { - getValidBatchesBeforeCompactionBatch(3, compactInterval = 3) - } - intercept[AssertionError] { - getValidBatchesBeforeCompactionBatch(4, compactInterval = 3) - } - assert(Seq(2, 3, 4) === getValidBatchesBeforeCompactionBatch(5, compactInterval = 3)) - } - - test("getAllValidBatches") { - assert(Seq(0) === getAllValidBatches(0, compactInterval = 3)) - assert(Seq(0, 1) === getAllValidBatches(1, compactInterval = 3)) - assert(Seq(2) === getAllValidBatches(2, compactInterval = 3)) - assert(Seq(2, 3) === getAllValidBatches(3, compactInterval = 3)) - assert(Seq(2, 3, 4) === getAllValidBatches(4, compactInterval = 3)) - assert(Seq(5) === getAllValidBatches(5, compactInterval = 3)) - assert(Seq(5, 6) === getAllValidBatches(6, compactInterval = 3)) - assert(Seq(5, 6, 7) === getAllValidBatches(7, compactInterval = 3)) - assert(Seq(8) === getAllValidBatches(8, compactInterval = 3)) - } - test("compactLogs") { withFileStreamSinkLog { sinkLog => val logs = Seq( @@ -184,19 +129,6 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { } } - test("batchIdToPath") { - withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { - withFileStreamSinkLog { sinkLog => - assert("0" === sinkLog.batchIdToPath(0).getName) - assert("1" === sinkLog.batchIdToPath(1).getName) - assert("2.compact" === sinkLog.batchIdToPath(2).getName) - assert("3" === sinkLog.batchIdToPath(3).getName) - assert("4" === sinkLog.batchIdToPath(4).getName) - assert("5.compact" === sinkLog.batchIdToPath(5).getName) - } - } - } - testWithUninterruptibleThread("compact") { withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { withFileStreamSinkLog { sinkLog => -- GitLab