Skip to content
Snippets Groups Projects
Commit e469d3ba authored by hyukjinkwon's avatar hyukjinkwon Committed by Shixiong Zhu
Browse files

[SPARK-18423][STREAMING] ReceiverTracker should close checkpoint dir when...

[SPARK-18423][STREAMING] ReceiverTracker should close checkpoint dir when stopped even if it was not started

## What changes were proposed in this pull request?

Several tests are being failed on Windows due to the failure of removing the checkpoint dir between each tests.

This is caused by not closed file in `ReceiverTracker`. When it is not started, it does not close it even if `stop()` is called.

```
Test org.apache.spark.streaming.JavaAPISuite.testCheckpointMasterRecovery started
Test org.apache.spark.streaming.JavaAPISuite.testCheckpointMasterRecovery failed: java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\1478983663710-0, took 3.828 sec
    at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1010)
    at org.apache.spark.util.Utils.deleteRecursively(Utils.scala)
    at org.apache.spark.streaming.JavaAPISuite.testCheckpointMasterRecovery(JavaAPISuite.java:1809)
    ...
```

```
- mapWithState - basic operations with simple API (7 seconds, 640 milliseconds)
Exception encountered when attempting to run a suite with class name: org.apache.spark.streaming.MapWithStateSuite *** ABORTED *** (12 seconds, 688 milliseconds)
  java.io.IOException: Failed to delete: C:\projects\spark\streaming\checkpoint\spark-b8486e2b-6468-4e6f-bb24-88277d2c033c
  ...
```

## How was this patch tested?

Tests in `JavaAPISuite` and `MapWithStateSuite`.

Manually tested via AppVeyor:

**Before**

- `org.apache.spark.streaming.JavaAPISuite`
  Build: https://ci.appveyor.com/project/spark-test/spark/build/71-MapWithStateSuite-1
  Diff: https://github.com/apache/spark/compare/master...spark-test:188c828e682ec45b75d15c3dfc782bcdc8ce024c

- `org.apache.spark.streaming.MapWithStateSuite`
  Build: https://ci.appveyor.com/project/spark-test/spark/build/72-MapWithStateSuite-1
  Diff: https://github.com/apache/spark/compare/master...spark-test:8f6945d0ccde022a23d3848f6b7fe6da1e7c902e

**After**

- `org.apache.spark.streaming.JavaAPISuite`
  Build started: [Streaming] `org.apache.spark.streaming.JavaAPISuite` [![PR-15867](https://ci.appveyor.com/api/projects/status/github/spark-test/spark?branch=3D74F2D5-B0D5-4E1D-874C-685AE694FD37&svg=true)](https://ci.appveyor.com/project/spark-test/spark/branch/3D74F2D5-B0D5-4E1D-874C-685AE694FD37)
  Diff: https://github.com/apache/spark/compare/master...spark-test:3D74F2D5-B0D5-4E1D-874C-685AE694FD37

- `org.apache.spark.streaming.MapWithStateSuite`
  Build started: [Streaming] `org.apache.spark.streaming.MapWithStateSuite` [![PR-15867](https://ci.appveyor.com/api/projects/status/github/spark-test/spark?branch=C8E88B64-49F0-4157-9AFA-FC3ACC442351&svg=true)](https://ci.appveyor.com/project/spark-test/spark/branch/C8E88B64-49F0-4157-9AFA-FC3ACC442351)
  Diff: https://github.com/apache/spark/compare/master...spark-test:C8E88B64-49F0-4157-9AFA-FC3ACC442351



Author: hyukjinkwon <gurwls223@gmail.com>

Closes #15867 from HyukjinKwon/SPARK-18423.

(cherry picked from commit 503378f1)
Signed-off-by: default avatarShixiong Zhu <shixiong@databricks.com>
parent b424dc94
No related branches found
No related tags found
No related merge requests found
......@@ -197,6 +197,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
receivedBlockTracker.stop()
logInfo("ReceiverTracker stopped")
trackerState = Stopped
} else if (isTrackerInitialized) {
trackerState = Stopping
// `ReceivedBlockTracker` is open when this instance is created. We should
// close this even if this `ReceiverTracker` is not started.
receivedBlockTracker.stop()
logInfo("ReceiverTracker stopped")
trackerState = Stopped
}
}
......@@ -446,6 +453,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
endpoint.send(StartAllReceivers(receivers))
}
/** Check if tracker has been marked for initiated */
private def isTrackerInitialized: Boolean = trackerState == Initialized
/** Check if tracker has been marked for starting */
private def isTrackerStarted: Boolean = trackerState == Started
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment