Skip to content
Snippets Groups Projects
Commit 8f0df6bc authored by Yuval Itzchakov's avatar Yuval Itzchakov Committed by Sean Owen
Browse files

[SPARK-21873][SS] - Avoid using `return` inside `CachedKafkaConsumer.get`

During profiling of a structured streaming application with Kafka as the source, I came across this exception:

![Structured Streaming Kafka Exceptions](https://user-images.githubusercontent.com/3448320/29743366-4149ef78-8a99-11e7-94d6-f0cbb691134a.png)

This is a 1 minute sample, which caused 106K `NonLocalReturnControl` exceptions to be thrown.
This happens because `CachedKafkaConsumer.get` is ran inside:

`private def runUninterruptiblyIfPossible[T](body: => T): T`

Where `body: => T` is the `get` method. Turning the method into a function means that in order to escape the `while` loop defined in `get` the runtime has to do dirty tricks which involve throwing the above exception.

## What changes were proposed in this pull request?

Instead of using `return` (which is generally not recommended in Scala), we place the result of the `fetchData` method inside a local variable and use a boolean flag to indicate the status of fetching data, which we monitor as our predicate to the `while` loop.

## How was this patch tested?

I've ran the `KafkaSourceSuite` to make sure regression passes. Since the exception isn't visible from user code, there is no way (at least that I could think of) to add this as a test to the existing suite.

Author: Yuval Itzchakov <yuval.itzchakov@clicktale.com>

Closes #19059 from YuvalItzchakov/master.
parent d4895c9d
No related branches found
No related tags found
No related merge requests found
......@@ -112,9 +112,15 @@ private[kafka010] case class CachedKafkaConsumer private(
// we will move to the next available offset within `[offset, untilOffset)` and retry.
// If `failOnDataLoss` is `true`, the loop body will be executed only once.
var toFetchOffset = offset
while (toFetchOffset != UNKNOWN_OFFSET) {
var consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]] = null
// We want to break out of the while loop on a successful fetch to avoid using "return"
// which may causes a NonLocalReturnControl exception when this method is used as a function.
var isFetchComplete = false
while (toFetchOffset != UNKNOWN_OFFSET && !isFetchComplete) {
try {
return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
consumerRecord = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
isFetchComplete = true
} catch {
case e: OffsetOutOfRangeException =>
// When there is some error thrown, it's better to use a new consumer to drop all cached
......@@ -125,8 +131,13 @@ private[kafka010] case class CachedKafkaConsumer private(
toFetchOffset = getEarliestAvailableOffsetBetween(toFetchOffset, untilOffset)
}
}
resetFetchedData()
null
if (isFetchComplete) {
consumerRecord
} else {
resetFetchedData()
null
}
}
/**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment