Skip to content
Snippets Groups Projects
Commit ca7a7e8a authored by uncleGen's avatar uncleGen Committed by Shixiong Zhu
Browse files

[SPARK-19822][TEST] CheckpointSuite.testCheckpointedOperation: should not...

[SPARK-19822][TEST] CheckpointSuite.testCheckpointedOperation: should not filter checkpointFilesOfLatestTime with the PATH string.

## What changes were proposed in this pull request?

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73800/testReport/



```
sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedDueToTimeoutException: The code
passed to eventually never returned normally. Attempted 617 times over 10.003740484 seconds.
Last failure message: 8 did not equal 2.
	at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
	at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
	at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
	at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:336)
	at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
	at org.apache.spark.streaming.DStreamCheckpointTester$class.generateOutput(CheckpointSuite
.scala:172)
	at org.apache.spark.streaming.CheckpointSuite.generateOutput(CheckpointSuite.scala:211)
```

the check condition is:

```
val checkpointFilesOfLatestTime = Checkpoint.getCheckpointFiles(checkpointDir).filter {
     _.toString.contains(clock.getTimeMillis.toString)
}
// Checkpoint files are written twice for every batch interval. So assert that both
// are written to make sure that both of them have been written.
assert(checkpointFilesOfLatestTime.size === 2)
```

the path string may contain the `clock.getTimeMillis.toString`, like `3500` :

```
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-500
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-1000
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-1500
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-2000
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-2500
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3000
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3500.bk
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3500
                                                       ▲▲▲▲
```

so we should only check the filename, but not the whole path.

## How was this patch tested?

Jenkins.

Author: uncleGen <hustyugm@gmail.com>

Closes #17167 from uncleGen/flaky-CheckpointSuite.

(cherry picked from commit 207067ea)
Signed-off-by: default avatarShixiong Zhu <shixiong@databricks.com>
parent 664c9795
No related branches found
No related tags found
No related merge requests found
......@@ -152,11 +152,9 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
stopSparkContext: Boolean
): Seq[Seq[V]] = {
try {
val batchDuration = ssc.graph.batchDuration
val batchCounter = new BatchCounter(ssc)
ssc.start()
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val currentTime = clock.getTimeMillis()
logInfo("Manual clock before advancing = " + clock.getTimeMillis())
clock.setTime(targetBatchTime.milliseconds)
......@@ -171,7 +169,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
eventually(timeout(10 seconds)) {
val checkpointFilesOfLatestTime = Checkpoint.getCheckpointFiles(checkpointDir).filter {
_.toString.contains(clock.getTimeMillis.toString)
_.getName.contains(clock.getTimeMillis.toString)
}
// Checkpoint files are written twice for every batch interval. So assert that both
// are written to make sure that both of them have been written.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment