Skip to content
Snippets Groups Projects
Commit b235e013 authored by Hari Shreedharan's avatar Hari Shreedharan Committed by Patrick Wendell
Browse files

[SPARK-3686][STREAMING] Wait for sink to commit the channel before check...

...ing for the channel size.

Author: Hari Shreedharan <hshreedharan@apache.org>

Closes #2531 from harishreedharan/sparksinksuite-fix and squashes the following commits:

30393c1 [Hari Shreedharan] Use more deterministic method to figure out when batches come in.
6ce9d8b [Hari Shreedharan] [SPARK-3686][STREAMING] Wait for sink to commit the channel before checking for the channel size.
parent 86bce764
No related branches found
No related tags found
No related merge requests found
......@@ -16,7 +16,7 @@
*/
package org.apache.spark.streaming.flume.sink
import java.util.concurrent.{ConcurrentHashMap, Executors}
import java.util.concurrent.{CountDownLatch, ConcurrentHashMap, Executors}
import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConversions._
......@@ -58,8 +58,12 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
private val seqBase = RandomStringUtils.randomAlphanumeric(8)
private val seqCounter = new AtomicLong(0)
@volatile private var stopped = false
@volatile private var isTest = false
private var testLatch: CountDownLatch = null
/**
* Returns a bunch of events to Spark over Avro RPC.
* @param n Maximum number of events to return in a batch
......@@ -90,6 +94,9 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
val processor = new TransactionProcessor(
channel, seq, n, transactionTimeout, backOffInterval, this)
sequenceNumberToProcessor.put(seq, processor)
if (isTest) {
processor.countDownWhenBatchAcked(testLatch)
}
Some(processor)
} else {
None
......@@ -141,6 +148,11 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
}
}
private[sink] def countDownWhenBatchAcked(latch: CountDownLatch) {
testLatch = latch
isTest = true
}
/**
* Shuts down the executor used to process transactions.
*/
......
......@@ -138,6 +138,16 @@ class SparkSink extends AbstractSink with Logging with Configurable {
throw new RuntimeException("Server was not started!")
)
}
/**
* Pass in a [[CountDownLatch]] for testing purposes. This batch is counted down when each
* batch is received. The test can simply call await on this latch till the expected number of
* batches are received.
* @param latch
*/
private[flume] def countdownWhenBatchReceived(latch: CountDownLatch) {
handler.foreach(_.countDownWhenBatchAcked(latch))
}
}
/**
......
......@@ -62,6 +62,10 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
@volatile private var stopped = false
@volatile private var isTest = false
private var testLatch: CountDownLatch = null
// The transaction that this processor would handle
var txOpt: Option[Transaction] = None
......@@ -182,6 +186,9 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
rollbackAndClose(tx, close = false) // tx will be closed later anyway
} finally {
tx.close()
if (isTest) {
testLatch.countDown()
}
}
} else {
logWarning("Spark could not commit transaction, NACK received. Rolling back transaction.")
......@@ -237,4 +244,9 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
processAckOrNack()
null
}
private[sink] def countDownWhenBatchAcked(latch: CountDownLatch) {
testLatch = latch
isTest = true
}
}
......@@ -38,7 +38,7 @@ class SparkSinkSuite extends FunSuite {
val channelCapacity = 5000
test("Success with ack") {
val (channel, sink) = initializeChannelAndSink()
val (channel, sink, latch) = initializeChannelAndSink()
channel.start()
sink.start()
......@@ -51,6 +51,7 @@ class SparkSinkSuite extends FunSuite {
val events = client.getEventBatch(1000)
client.ack(events.getSequenceNumber)
assert(events.getEvents.size() === 1000)
latch.await(1, TimeUnit.SECONDS)
assertChannelIsEmpty(channel)
sink.stop()
channel.stop()
......@@ -58,7 +59,7 @@ class SparkSinkSuite extends FunSuite {
}
test("Failure with nack") {
val (channel, sink) = initializeChannelAndSink()
val (channel, sink, latch) = initializeChannelAndSink()
channel.start()
sink.start()
putEvents(channel, eventsPerBatch)
......@@ -70,6 +71,7 @@ class SparkSinkSuite extends FunSuite {
val events = client.getEventBatch(1000)
assert(events.getEvents.size() === 1000)
client.nack(events.getSequenceNumber)
latch.await(1, TimeUnit.SECONDS)
assert(availableChannelSlots(channel) === 4000)
sink.stop()
channel.stop()
......@@ -77,7 +79,7 @@ class SparkSinkSuite extends FunSuite {
}
test("Failure with timeout") {
val (channel, sink) = initializeChannelAndSink(Map(SparkSinkConfig
val (channel, sink, latch) = initializeChannelAndSink(Map(SparkSinkConfig
.CONF_TRANSACTION_TIMEOUT -> 1.toString))
channel.start()
sink.start()
......@@ -88,7 +90,7 @@ class SparkSinkSuite extends FunSuite {
val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
val events = client.getEventBatch(1000)
assert(events.getEvents.size() === 1000)
Thread.sleep(1000)
latch.await(1, TimeUnit.SECONDS)
assert(availableChannelSlots(channel) === 4000)
sink.stop()
channel.stop()
......@@ -106,7 +108,7 @@ class SparkSinkSuite extends FunSuite {
def testMultipleConsumers(failSome: Boolean): Unit = {
implicit val executorContext = ExecutionContext
.fromExecutorService(Executors.newFixedThreadPool(5))
val (channel, sink) = initializeChannelAndSink()
val (channel, sink, latch) = initializeChannelAndSink(Map.empty, 5)
channel.start()
sink.start()
(1 to 5).foreach(_ => putEvents(channel, eventsPerBatch))
......@@ -136,7 +138,7 @@ class SparkSinkSuite extends FunSuite {
}
})
batchCounter.await()
TimeUnit.SECONDS.sleep(1) // Allow the sink to commit the transactions.
latch.await(1, TimeUnit.SECONDS)
executorContext.shutdown()
if(failSome) {
assert(availableChannelSlots(channel) === 3000)
......@@ -148,8 +150,8 @@ class SparkSinkSuite extends FunSuite {
transceiversAndClients.foreach(x => x._1.close())
}
private def initializeChannelAndSink(overrides: Map[String, String] = Map.empty): (MemoryChannel,
SparkSink) = {
private def initializeChannelAndSink(overrides: Map[String, String] = Map.empty,
batchCounter: Int = 1): (MemoryChannel, SparkSink, CountDownLatch) = {
val channel = new MemoryChannel()
val channelContext = new Context()
......@@ -165,7 +167,9 @@ class SparkSinkSuite extends FunSuite {
sinkContext.put(SparkSinkConfig.CONF_PORT, 0.toString)
sink.configure(sinkContext)
sink.setChannel(channel)
(channel, sink)
val latch = new CountDownLatch(batchCounter)
sink.countdownWhenBatchReceived(latch)
(channel, sink, latch)
}
private def putEvents(ch: MemoryChannel, count: Int): Unit = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment