Skip to content
Snippets Groups Projects
Commit eb3ea3a0 authored by hyukjinkwon's avatar hyukjinkwon Committed by Sean Owen
Browse files

[SPARK-20935][STREAMING] Always close WriteAheadLog and make it idempotent

## What changes were proposed in this pull request?

This PR proposes to stop `ReceiverTracker` to close `WriteAheadLog` whenever it is and make `WriteAheadLog` and its implementations idempotent.

## How was this patch tested?

Added a test in `WriteAheadLogSuite`. Note that  the added test looks passing even if it closes twice (namely even without the changes in `FileBasedWriteAheadLog` and `BatchedWriteAheadLog`. It looks both are already idempotent but this is a rather sanity check.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18224 from HyukjinKwon/streaming-closing.
parent 8da3f704
No related branches found
No related tags found
No related merge requests found
...@@ -56,7 +56,7 @@ public abstract class WriteAheadLog { ...@@ -56,7 +56,7 @@ public abstract class WriteAheadLog {
public abstract void clean(long threshTime, boolean waitForCompletion); public abstract void clean(long threshTime, boolean waitForCompletion);
/** /**
* Close this log and release any resources. * Close this log and release any resources. It must be idempotent.
*/ */
public abstract void close(); public abstract void close();
} }
...@@ -165,11 +165,11 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false ...@@ -165,11 +165,11 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
/** Stop the receiver execution thread. */ /** Stop the receiver execution thread. */
def stop(graceful: Boolean): Unit = synchronized { def stop(graceful: Boolean): Unit = synchronized {
if (isTrackerStarted) { val isStarted: Boolean = isTrackerStarted
// First, stop the receivers trackerState = Stopping
trackerState = Stopping if (isStarted) {
if (!skipReceiverLaunch) { if (!skipReceiverLaunch) {
// Send the stop signal to all the receivers // First, stop the receivers. Send the stop signal to all the receivers
endpoint.askSync[Boolean](StopAllReceivers) endpoint.askSync[Boolean](StopAllReceivers)
// Wait for the Spark job that runs the receivers to be over // Wait for the Spark job that runs the receivers to be over
...@@ -194,17 +194,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false ...@@ -194,17 +194,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
// Finally, stop the endpoint // Finally, stop the endpoint
ssc.env.rpcEnv.stop(endpoint) ssc.env.rpcEnv.stop(endpoint)
endpoint = null endpoint = null
receivedBlockTracker.stop()
logInfo("ReceiverTracker stopped")
trackerState = Stopped
} else if (isTrackerInitialized) {
trackerState = Stopping
// `ReceivedBlockTracker` is open when this instance is created. We should
// close this even if this `ReceiverTracker` is not started.
receivedBlockTracker.stop()
logInfo("ReceiverTracker stopped")
trackerState = Stopped
} }
// `ReceivedBlockTracker` is open when this instance is created. We should
// close this even if this `ReceiverTracker` is not started.
receivedBlockTracker.stop()
logInfo("ReceiverTracker stopped")
trackerState = Stopped
} }
/** Allocate all unallocated blocks to the given batch. */ /** Allocate all unallocated blocks to the given batch. */
...@@ -453,9 +449,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false ...@@ -453,9 +449,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
endpoint.send(StartAllReceivers(receivers)) endpoint.send(StartAllReceivers(receivers))
} }
/** Check if tracker has been marked for initiated */
private def isTrackerInitialized: Boolean = trackerState == Initialized
/** Check if tracker has been marked for starting */ /** Check if tracker has been marked for starting */
private def isTrackerStarted: Boolean = trackerState == Started private def isTrackerStarted: Boolean = trackerState == Started
......
...@@ -19,6 +19,7 @@ package org.apache.spark.streaming.util ...@@ -19,6 +19,7 @@ package org.apache.spark.streaming.util
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.{Iterator => JIterator} import java.util.{Iterator => JIterator}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.LinkedBlockingQueue
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
...@@ -60,7 +61,7 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp ...@@ -60,7 +61,7 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp
private val walWriteQueue = new LinkedBlockingQueue[Record]() private val walWriteQueue = new LinkedBlockingQueue[Record]()
// Whether the writer thread is active // Whether the writer thread is active
@volatile private var active: Boolean = true private val active: AtomicBoolean = new AtomicBoolean(true)
private val buffer = new ArrayBuffer[Record]() private val buffer = new ArrayBuffer[Record]()
private val batchedWriterThread = startBatchedWriterThread() private val batchedWriterThread = startBatchedWriterThread()
...@@ -72,7 +73,7 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp ...@@ -72,7 +73,7 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp
override def write(byteBuffer: ByteBuffer, time: Long): WriteAheadLogRecordHandle = { override def write(byteBuffer: ByteBuffer, time: Long): WriteAheadLogRecordHandle = {
val promise = Promise[WriteAheadLogRecordHandle]() val promise = Promise[WriteAheadLogRecordHandle]()
val putSuccessfully = synchronized { val putSuccessfully = synchronized {
if (active) { if (active.get()) {
walWriteQueue.offer(Record(byteBuffer, time, promise)) walWriteQueue.offer(Record(byteBuffer, time, promise))
true true
} else { } else {
...@@ -121,9 +122,7 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp ...@@ -121,9 +122,7 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp
*/ */
override def close(): Unit = { override def close(): Unit = {
logInfo(s"BatchedWriteAheadLog shutting down at time: ${System.currentTimeMillis()}.") logInfo(s"BatchedWriteAheadLog shutting down at time: ${System.currentTimeMillis()}.")
synchronized { if (!active.getAndSet(false)) return
active = false
}
batchedWriterThread.interrupt() batchedWriterThread.interrupt()
batchedWriterThread.join() batchedWriterThread.join()
while (!walWriteQueue.isEmpty) { while (!walWriteQueue.isEmpty) {
...@@ -138,7 +137,7 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp ...@@ -138,7 +137,7 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp
private def startBatchedWriterThread(): Thread = { private def startBatchedWriterThread(): Thread = {
val thread = new Thread(new Runnable { val thread = new Thread(new Runnable {
override def run(): Unit = { override def run(): Unit = {
while (active) { while (active.get()) {
try { try {
flushRecords() flushRecords()
} catch { } catch {
...@@ -166,7 +165,7 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp ...@@ -166,7 +165,7 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp
} }
try { try {
var segment: WriteAheadLogRecordHandle = null var segment: WriteAheadLogRecordHandle = null
if (buffer.length > 0) { if (buffer.nonEmpty) {
logDebug(s"Batched ${buffer.length} records for Write Ahead Log write") logDebug(s"Batched ${buffer.length} records for Write Ahead Log write")
// threads may not be able to add items in order by time // threads may not be able to add items in order by time
val sortedByTime = buffer.sortBy(_.time) val sortedByTime = buffer.sortBy(_.time)
......
...@@ -205,10 +205,12 @@ private[streaming] class FileBasedWriteAheadLog( ...@@ -205,10 +205,12 @@ private[streaming] class FileBasedWriteAheadLog(
/** Stop the manager, close any open log writer */ /** Stop the manager, close any open log writer */
def close(): Unit = synchronized { def close(): Unit = synchronized {
if (currentLogWriter != null) { if (!executionContext.isShutdown) {
currentLogWriter.close() if (currentLogWriter != null) {
currentLogWriter.close()
}
executionContext.shutdown()
} }
executionContext.shutdown()
logInfo("Stopped write ahead log manager") logInfo("Stopped write ahead log manager")
} }
......
...@@ -57,6 +57,8 @@ class ReceiverTrackerSuite extends TestSuiteBase { ...@@ -57,6 +57,8 @@ class ReceiverTrackerSuite extends TestSuiteBase {
} }
} finally { } finally {
tracker.stop(false) tracker.stop(false)
// Make sure it is idempotent.
tracker.stop(false)
} }
} }
} }
......
...@@ -140,6 +140,8 @@ abstract class CommonWriteAheadLogTests( ...@@ -140,6 +140,8 @@ abstract class CommonWriteAheadLogTests(
} }
} }
writeAheadLog.close() writeAheadLog.close()
// Make sure it is idempotent.
writeAheadLog.close()
} }
test(testPrefix + "handling file errors while reading rotating logs") { test(testPrefix + "handling file errors while reading rotating logs") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment