Skip to content
Snippets Groups Projects
Commit e063e29a authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #680 from tdas/master

Fixed major performance bug in Network Receiver
parents bf1311e6 280418ac
No related branches found
No related tags found
No related merge requests found
......@@ -140,12 +140,10 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
/**
* Pushes a block (as iterator of values) into the block manager.
* Pushes a block (as an ArrayBuffer filled with data) into the block manager.
*/
def pushBlock(blockId: String, iterator: Iterator[T], metadata: Any, level: StorageLevel) {
val buffer = new ArrayBuffer[T] ++ iterator
env.blockManager.put(blockId, buffer.asInstanceOf[ArrayBuffer[Any]], level)
def pushBlock(blockId: String, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) {
env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level)
actor ! ReportBlock(blockId, metadata)
}
......@@ -195,7 +193,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
class BlockGenerator(storageLevel: StorageLevel)
extends Serializable with Logging {
case class Block(id: String, iterator: Iterator[T], metadata: Any = null)
case class Block(id: String, buffer: ArrayBuffer[T], metadata: Any = null)
val clock = new SystemClock()
val blockInterval = System.getProperty("spark.streaming.blockInterval", "200").toLong
......@@ -222,17 +220,13 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
currentBuffer += obj
}
private def createBlock(blockId: String, iterator: Iterator[T]) : Block = {
new Block(blockId, iterator)
}
private def updateCurrentBuffer(time: Long) {
try {
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[T]
if (newBlockBuffer.size > 0) {
val blockId = "input-" + NetworkReceiver.this.streamId + "-" + (time - blockInterval)
val newBlock = createBlock(blockId, newBlockBuffer.toIterator)
val newBlock = new Block(blockId, newBlockBuffer)
blocksForPushing.add(newBlock)
}
} catch {
......@@ -248,7 +242,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
try {
while(true) {
val block = blocksForPushing.take()
NetworkReceiver.this.pushBlock(block.id, block.iterator, block.metadata, storageLevel)
NetworkReceiver.this.pushBlock(block.id, block.buffer, block.metadata, storageLevel)
}
} catch {
case ie: InterruptedException =>
......
......@@ -9,6 +9,8 @@ import spark.streaming.dstream.NetworkReceiver
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.ArrayBuffer
/** A helper with set of defaults for supervisor strategy **/
object ReceiverSupervisorStrategy {
......@@ -136,8 +138,9 @@ private[streaming] class ActorReceiver[T: ClassManifest](
}
protected def pushBlock(iter: Iterator[T]) {
pushBlock("block-" + streamId + "-" + System.nanoTime(),
iter, null, storageLevel)
val buffer = new ArrayBuffer[T]
buffer ++= iter
pushBlock("block-" + streamId + "-" + System.nanoTime(), buffer, null, storageLevel)
}
protected def onStart() = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment