Skip to content
Snippets Groups Projects
Commit 3f5f4cc4 authored by jerryshao's avatar jerryshao Committed by Tathagata Das
Browse files

[SPARK-4671][Streaming]Do not replicate streaming block when WAL is enabled

Currently streaming block will be replicated when specific storage level is set, since WAL is already fault tolerant, so replication is needless and will hurt the throughput of streaming application.

Hi tdas , as per discussed about this issue, I fixed with this implementation, I'm not is this the way you want, would you mind taking a look at it? Thanks a lot.

Author: jerryshao <saisai.shao@intel.com>

Closes #3534 from jerryshao/SPARK-4671 and squashes the following commits:

500b456 [jerryshao] Do not replicate streaming block when WAL is enabled
parent 10d69e9c
No related branches found
No related tags found
No related merge requests found
...@@ -121,6 +121,24 @@ private[streaming] class WriteAheadLogBasedBlockHandler( ...@@ -121,6 +121,24 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
private val maxFailures = conf.getInt( private val maxFailures = conf.getInt(
"spark.streaming.receiver.writeAheadLog.maxFailures", 3) "spark.streaming.receiver.writeAheadLog.maxFailures", 3)
private val effectiveStorageLevel = {
if (storageLevel.deserialized) {
logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported when" +
s" write ahead log is enabled, change to serialization false")
}
if (storageLevel.replication > 1) {
logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when " +
s"write ahead log is enabled, change to replication 1")
}
StorageLevel(storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1)
}
if (storageLevel != effectiveStorageLevel) {
logWarning(s"User defined storage level $storageLevel is changed to effective storage level " +
s"$effectiveStorageLevel when write ahead log is enabled")
}
// Manages rolling log files // Manages rolling log files
private val logManager = new WriteAheadLogManager( private val logManager = new WriteAheadLogManager(
checkpointDirToLogDir(checkpointDir, streamId), checkpointDirToLogDir(checkpointDir, streamId),
...@@ -156,7 +174,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler( ...@@ -156,7 +174,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
// Store the block in block manager // Store the block in block manager
val storeInBlockManagerFuture = Future { val storeInBlockManagerFuture = Future {
val putResult = val putResult =
blockManager.putBytes(blockId, serializedBlock, storageLevel, tellMaster = true) blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true)
if (!putResult.map { _._1 }.contains(blockId)) { if (!putResult.map { _._1 }.contains(blockId)) {
throw new SparkException( throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel") s"Could not store $blockId to block manager with storage level $storageLevel")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment