diff --git a/docs/configuration.md b/docs/configuration.md index 04eb6daaa5d016a6ecdaa51575011882bb328bb1..55f1962b18bd21dac51746f523342e5f6f741959 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -253,6 +253,13 @@ Apart from these, the following properties are also available, and may be useful applications). Note that any RDD that persists in memory for more than this duration will be cleared as well. </td> </tr> +<tr> + <td>spark.streaming.blockInterval</td> + <td>200</td> + <td> + Duration (milliseconds) of how long to batch new objects coming from network receivers. + </td> +</tr> </table> diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala index 7385474963ebb70bfd7f8f86a16282e341824c80..26805e96210ab2cfa3226dc905bb2b31dfc8b2a5 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala @@ -198,7 +198,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log case class Block(id: String, iterator: Iterator[T], metadata: Any = null) val clock = new SystemClock() - val blockInterval = 200L + val blockInterval = System.getProperty("spark.streaming.blockInterval", "200").toLong val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer) val blockStorageLevel = storageLevel val blocksForPushing = new ArrayBlockingQueue[Block](1000)