Skip to content
Snippets Groups Projects
Commit f0500f9f authored by Hari Shreedharan's avatar Hari Shreedharan Committed by Tathagata Das
Browse files

[SPARK-4707][STREAMING] Reliable Kafka Receiver can lose data if the blo...

...ck generator fails to store data.

The Reliable Kafka Receiver commits offsets only when events are actually stored, which ensures that on restart we will actually start where we left off. But if the failure happens in the store() call, and the block generator reports an error the receiver does not do anything and will continue reading from the current offset and not the last commit. This means that messages between the last commit and the current offset will be lost.

This PR retries the store call four times and then stops the receiver with an error message and the last exception that was received from the store.

Author: Hari Shreedharan <hshreedharan@apache.org>

Closes #3655 from harishreedharan/kafka-failure-fix and squashes the following commits:

5e2e7ad [Hari Shreedharan] [SPARK-4704][STREAMING] Reliable Kafka Receiver can lose data if the block generator fails to store data.
parent b0c00219
No related branches found
No related tags found
No related merge requests found
......@@ -201,12 +201,31 @@ class ReliableKafkaReceiver[
topicPartitionOffsetMap.clear()
}
/** Store the ready-to-be-stored block and commit the related offsets to zookeeper. */
/**
* Store the ready-to-be-stored block and commit the related offsets to zookeeper. This method
* will try a fixed number of times to push the block. If the push fails, the receiver is stopped.
*/
private def storeBlockAndCommitOffset(
blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
blockOffsetMap.remove(blockId)
var count = 0
var pushed = false
var exception: Exception = null
while (!pushed && count <= 3) {
try {
store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
pushed = true
} catch {
case ex: Exception =>
count += 1
exception = ex
}
}
if (pushed) {
Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
blockOffsetMap.remove(blockId)
} else {
stop("Error while storing block into Spark", exception)
}
}
/**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment