Skip to content
Snippets Groups Projects
Commit 5c8f4bd5 authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-7138] [STREAMING] Add method to BlockGenerator to add multiple records...

[SPARK-7138] [STREAMING] Add method to BlockGenerator to add multiple records to BlockGenerator with single callback

This is to ensure that receivers that receive data in small batches (like Kinesis) and want to add them but want the callback function to be called only once. This is for internal use only for improvement to Kinesis Receiver that we are planning to do.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #5695 from tdas/SPARK-7138 and squashes the following commits:

a35cf7d [Tathagata Das] Fixed style.
a7a4cb9 [Tathagata Das] Added extra method to BlockGenerator.
parent d36e6735
No related branches found
No related tags found
No related merge requests found
......@@ -126,6 +126,20 @@ private[streaming] class BlockGenerator(
listener.onAddData(data, metadata)
}
/**
* Push multiple data items into the buffer. After buffering the data, the
* `BlockGeneratorListener.onAddData` callback will be called. All received data items
* will be periodically pushed into BlockManager. Note that all the data items is guaranteed
* to be present in a single block.
*/
def addMultipleDataWithCallback(dataIterator: Iterator[Any], metadata: Any): Unit = synchronized {
dataIterator.foreach { data =>
waitToPush()
currentBuffer += data
}
listener.onAddData(dataIterator, metadata)
}
/** Change the buffer to which single records are added to. */
private def updateCurrentBuffer(time: Long): Unit = synchronized {
try {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment