Skip to content
Snippets Groups Projects
Commit 18a761ef authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-9968] [STREAMING] Reduced time spent within synchronized block to prevent lock starvation

When the rate limiter is actually limiting the rate at which data is inserted into the buffer, the synchronized block of BlockGenerator.addData stays blocked for long time. This causes the thread switching the buffer and generating blocks (synchronized with addData) to starve and not generate blocks for seconds. The correct solution is to not block on the rate limiter within the synchronized block for adding data to the buffer.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #8204 from tdas/SPARK-9968 and squashes the following commits:

8cbcc1b [Tathagata Das] Removed unused val
a73b645 [Tathagata Das] Reduced time spent within synchronized block
parent f3bfb711
No related branches found
No related tags found
No related merge requests found
......@@ -155,10 +155,17 @@ private[streaming] class BlockGenerator(
/**
* Push a single data item into the buffer.
*/
def addData(data: Any): Unit = synchronized {
def addData(data: Any): Unit = {
if (state == Active) {
waitToPush()
currentBuffer += data
synchronized {
if (state == Active) {
currentBuffer += data
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
......@@ -169,11 +176,18 @@ private[streaming] class BlockGenerator(
* Push a single data item into the buffer. After buffering the data, the
* `BlockGeneratorListener.onAddData` callback will be called.
*/
def addDataWithCallback(data: Any, metadata: Any): Unit = synchronized {
def addDataWithCallback(data: Any, metadata: Any): Unit = {
if (state == Active) {
waitToPush()
currentBuffer += data
listener.onAddData(data, metadata)
synchronized {
if (state == Active) {
currentBuffer += data
listener.onAddData(data, metadata)
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
......@@ -185,13 +199,23 @@ private[streaming] class BlockGenerator(
* `BlockGeneratorListener.onAddData` callback will be called. Note that all the data items
* are atomically added to the buffer, and are hence guaranteed to be present in a single block.
*/
def addMultipleDataWithCallback(dataIterator: Iterator[Any], metadata: Any): Unit = synchronized {
def addMultipleDataWithCallback(dataIterator: Iterator[Any], metadata: Any): Unit = {
if (state == Active) {
// Unroll iterator into a temp buffer, and wait for pushing in the process
val tempBuffer = new ArrayBuffer[Any]
dataIterator.foreach { data =>
waitToPush()
currentBuffer += data
tempBuffer += data
}
synchronized {
if (state == Active) {
currentBuffer ++= tempBuffer
listener.onAddData(tempBuffer, metadata)
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}
listener.onAddData(dataIterator, metadata)
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment