From 207067ead6db6dc87b0d144a658e2564e3280a89 Mon Sep 17 00:00:00 2001
From: uncleGen <hustyugm@gmail.com>
Date: Sun, 5 Mar 2017 18:17:30 -0800
Subject: [PATCH] [SPARK-19822][TEST]
 CheckpointSuite.testCheckpointedOperation: should not filter
 checkpointFilesOfLatestTime with the PATH string.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

## What changes were proposed in this pull request?

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73800/testReport/

```
sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedDueToTimeoutException: The code
passed to eventually never returned normally. Attempted 617 times over 10.003740484 seconds.
Last failure message: 8 did not equal 2.
	at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
	at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
	at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
	at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:336)
	at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
	at org.apache.spark.streaming.DStreamCheckpointTester$class.generateOutput(CheckpointSuite
.scala:172)
	at org.apache.spark.streaming.CheckpointSuite.generateOutput(CheckpointSuite.scala:211)
```

the check condition is:

```
val checkpointFilesOfLatestTime = Checkpoint.getCheckpointFiles(checkpointDir).filter {
     _.toString.contains(clock.getTimeMillis.toString)
}
// Checkpoint files are written twice for every batch interval. So assert that both
// are written to make sure that both of them have been written.
assert(checkpointFilesOfLatestTime.size === 2)
```

the path string may contain the `clock.getTimeMillis.toString`, like `3500` :

```
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-500
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-1000
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-1500
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-2000
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-2500
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3000
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3500.bk
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3500
                                                       ▲▲▲▲
```

so we should only check the filename, but not the whole path.

## How was this patch tested?

Jenkins.

Author: uncleGen <hustyugm@gmail.com>

Closes #17167 from uncleGen/flaky-CheckpointSuite.
---
 .../scala/org/apache/spark/streaming/CheckpointSuite.scala    | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 7fcf45e7de..ee2fd45a7e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -152,11 +152,9 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
       stopSparkContext: Boolean
     ): Seq[Seq[V]] = {
     try {
-      val batchDuration = ssc.graph.batchDuration
       val batchCounter = new BatchCounter(ssc)
       ssc.start()
       val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-      val currentTime = clock.getTimeMillis()
 
       logInfo("Manual clock before advancing = " + clock.getTimeMillis())
       clock.setTime(targetBatchTime.milliseconds)
@@ -171,7 +169,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
 
       eventually(timeout(10 seconds)) {
         val checkpointFilesOfLatestTime = Checkpoint.getCheckpointFiles(checkpointDir).filter {
-          _.toString.contains(clock.getTimeMillis.toString)
+          _.getName.contains(clock.getTimeMillis.toString)
         }
         // Checkpoint files are written twice for every batch interval. So assert that both
         // are written to make sure that both of them have been written.
-- 
GitLab