Skip to content
Snippets Groups Projects
Commit cd106b05 authored by cody koeninger's avatar cody koeninger Committed by Reynold Xin
Browse files

[SPARK-17841][STREAMING][KAFKA] drain commitQueue

## What changes were proposed in this pull request?

Actually drain commit queue rather than just iterating it.
iterator() on a concurrent linked queue won't remove items from the queue, poll() will.

## How was this patch tested?
Unit tests

Author: cody koeninger <cody@koeninger.org>

Closes #15407 from koeninger/SPARK-17841.
parent cd662bc7
No related branches found
No related tags found
No related merge requests found
......@@ -282,13 +282,13 @@ private[spark] class DirectKafkaInputDStream[K, V](
protected def commitAll(): Unit = {
val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]()
val it = commitQueue.iterator()
while (it.hasNext) {
val osr = it.next
var osr = commitQueue.poll()
while (null != osr) {
val tp = osr.topicPartition
val x = m.get(tp)
val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) }
m.put(tp, new OffsetAndMetadata(offset))
osr = commitQueue.poll()
}
if (!m.isEmpty) {
consumer.commitAsync(m, commitCallback.get)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment