Skip to content
Snippets Groups Projects
Commit 280418ac authored by Tathagata Das's avatar Tathagata Das
Browse files

Reduced the number of Iterator to ArrayBuffer copies in NetworkReceiver.

parent 6ad85d09
No related branches found
No related tags found
No related merge requests found
......@@ -140,12 +140,10 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
/**
* Pushes a block (as iterator of values) into the block manager.
* Pushes a block (as an ArrayBuffer filled with data) into the block manager.
*/
def pushBlock(blockId: String, iterator: Iterator[T], metadata: Any, level: StorageLevel) {
val buffer = new ArrayBuffer[T] ++ iterator
env.blockManager.put(blockId, buffer.asInstanceOf[ArrayBuffer[Any]], level)
def pushBlock(blockId: String, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) {
env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level)
actor ! ReportBlock(blockId, metadata)
}
......@@ -195,7 +193,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
class BlockGenerator(storageLevel: StorageLevel)
extends Serializable with Logging {
case class Block(id: String, iterator: Iterator[T], metadata: Any = null)
case class Block(id: String, buffer: ArrayBuffer[T], metadata: Any = null)
val clock = new SystemClock()
val blockInterval = System.getProperty("spark.streaming.blockInterval", "200").toLong
......@@ -222,17 +220,13 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
currentBuffer += obj
}
private def createBlock(blockId: String, iterator: Iterator[T]) : Block = {
new Block(blockId, iterator)
}
private def updateCurrentBuffer(time: Long) {
try {
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[T]
if (newBlockBuffer.size > 0) {
val blockId = "input-" + NetworkReceiver.this.streamId + "-" + (time - blockInterval)
val newBlock = createBlock(blockId, newBlockBuffer.toIterator)
val newBlock = new Block(blockId, newBlockBuffer)
blocksForPushing.add(newBlock)
}
} catch {
......@@ -248,7 +242,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
try {
while(true) {
val block = blocksForPushing.take()
NetworkReceiver.this.pushBlock(block.id, block.iterator, block.metadata, storageLevel)
NetworkReceiver.this.pushBlock(block.id, block.buffer, block.metadata, storageLevel)
}
} catch {
case ie: InterruptedException =>
......
......@@ -9,6 +9,8 @@ import spark.streaming.dstream.NetworkReceiver
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.ArrayBuffer
/** A helper with set of defaults for supervisor strategy **/
object ReceiverSupervisorStrategy {
......@@ -136,8 +138,9 @@ private[streaming] class ActorReceiver[T: ClassManifest](
}
protected def pushBlock(iter: Iterator[T]) {
pushBlock("block-" + streamId + "-" + System.nanoTime(),
iter, null, storageLevel)
val buffer = new ArrayBuffer[T]
buffer ++= iter
pushBlock("block-" + streamId + "-" + System.nanoTime(), buffer, null, storageLevel)
}
protected def onStart() = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment