Skip to content
Snippets Groups Projects
Commit 6f671d04 authored by Hari Shreedharan's avatar Hari Shreedharan Committed by Tathagata Das
Browse files

[SPARK-3154][STREAMING] Make FlumePollingInputDStream shutdown cleaner.

Currently lot of errors get thrown from Avro IPC layer when the dstream
or sink is shutdown. This PR cleans it up. Some refactoring is done in the
receiver code to put all of the RPC code into a single Try and just recover
from that. The sink code has also been cleaned up.

Author: Hari Shreedharan <hshreedharan@apache.org>

Closes #2065 from harishreedharan/clean-flume-shutdown and squashes the following commits:

f93a07c [Hari Shreedharan] Formatting fixes.
d7427cc [Hari Shreedharan] More fixes!
a0a8852 [Hari Shreedharan] Fix race condition, hopefully! Minor other changes.
4c9ed02 [Hari Shreedharan] Remove unneeded list in Callback handler. Other misc changes.
8fee36f [Hari Shreedharan] Scala-library is required, else maven build fails. Also catch InterruptedException in TxnProcessor.
445e700 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into clean-flume-shutdown
87232e0 [Hari Shreedharan] Refactor Flume Input Stream. Clean up code, better error handling.
9001d26 [Hari Shreedharan] Change log level to debug in TransactionProcessor#shutdown method
e7b8d82 [Hari Shreedharan] Incorporate review feedback
598efa7 [Hari Shreedharan] Clean up some exception handling code
e1027c6 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into clean-flume-shutdown
ed608c8 [Hari Shreedharan] [SPARK-3154][STREAMING] Make FlumePollingInputDStream shutdown cleaner.
parent 171a41cb
No related branches found
No related tags found
No related merge requests found
...@@ -70,6 +70,10 @@ ...@@ -70,6 +70,10 @@
<artifactId>scalatest_${scala.binary.version}</artifactId> <artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency> <dependency>
<!-- <!--
Netty explicitly added in test as it has been excluded from Netty explicitly added in test as it has been excluded from
......
...@@ -19,6 +19,8 @@ package org.apache.spark.streaming.flume.sink ...@@ -19,6 +19,8 @@ package org.apache.spark.streaming.flume.sink
import java.util.concurrent.{ConcurrentHashMap, Executors} import java.util.concurrent.{ConcurrentHashMap, Executors}
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConversions._
import org.apache.flume.Channel import org.apache.flume.Channel
import org.apache.commons.lang.RandomStringUtils import org.apache.commons.lang.RandomStringUtils
import com.google.common.util.concurrent.ThreadFactoryBuilder import com.google.common.util.concurrent.ThreadFactoryBuilder
...@@ -45,7 +47,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha ...@@ -45,7 +47,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads, val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
new ThreadFactoryBuilder().setDaemon(true) new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Spark Sink Processor Thread - %d").build())) .setNameFormat("Spark Sink Processor Thread - %d").build()))
private val processorMap = new ConcurrentHashMap[CharSequence, TransactionProcessor]() private val sequenceNumberToProcessor =
new ConcurrentHashMap[CharSequence, TransactionProcessor]()
// This sink will not persist sequence numbers and reuses them if it gets restarted. // This sink will not persist sequence numbers and reuses them if it gets restarted.
// So it is possible to commit a transaction which may have been meant for the sink before the // So it is possible to commit a transaction which may have been meant for the sink before the
// restart. // restart.
...@@ -55,6 +58,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha ...@@ -55,6 +58,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
private val seqBase = RandomStringUtils.randomAlphanumeric(8) private val seqBase = RandomStringUtils.randomAlphanumeric(8)
private val seqCounter = new AtomicLong(0) private val seqCounter = new AtomicLong(0)
@volatile private var stopped = false
/** /**
* Returns a bunch of events to Spark over Avro RPC. * Returns a bunch of events to Spark over Avro RPC.
* @param n Maximum number of events to return in a batch * @param n Maximum number of events to return in a batch
...@@ -63,18 +68,33 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha ...@@ -63,18 +68,33 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
override def getEventBatch(n: Int): EventBatch = { override def getEventBatch(n: Int): EventBatch = {
logDebug("Got getEventBatch call from Spark.") logDebug("Got getEventBatch call from Spark.")
val sequenceNumber = seqBase + seqCounter.incrementAndGet() val sequenceNumber = seqBase + seqCounter.incrementAndGet()
val processor = new TransactionProcessor(channel, sequenceNumber, createProcessor(sequenceNumber, n) match {
n, transactionTimeout, backOffInterval, this) case Some(processor) =>
transactionExecutorOpt.foreach(executor => { transactionExecutorOpt.foreach(_.submit(processor))
executor.submit(processor) // Wait until a batch is available - will be an error if error message is non-empty
}) val batch = processor.getEventBatch
// Wait until a batch is available - will be an error if error message is non-empty if (SparkSinkUtils.isErrorBatch(batch)) {
val batch = processor.getEventBatch // Remove the processor if it is an error batch since no ACK is sent.
if (!SparkSinkUtils.isErrorBatch(batch)) { removeAndGetProcessor(sequenceNumber)
processorMap.put(sequenceNumber.toString, processor) logWarning("Received an error batch - no events were received from channel! ")
logDebug("Sending event batch with sequence number: " + sequenceNumber) }
batch
case None =>
new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList())
}
}
private def createProcessor(seq: String, n: Int): Option[TransactionProcessor] = {
sequenceNumberToProcessor.synchronized {
if (!stopped) {
val processor = new TransactionProcessor(
channel, seq, n, transactionTimeout, backOffInterval, this)
sequenceNumberToProcessor.put(seq, processor)
Some(processor)
} else {
None
}
} }
batch
} }
/** /**
...@@ -116,7 +136,9 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha ...@@ -116,7 +136,9 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
* longer tracked and the caller is responsible for that txn processor. * longer tracked and the caller is responsible for that txn processor.
*/ */
private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): TransactionProcessor = { private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): TransactionProcessor = {
processorMap.remove(sequenceNumber.toString) // The toString is required! sequenceNumberToProcessor.synchronized {
sequenceNumberToProcessor.remove(sequenceNumber.toString)
}
} }
/** /**
...@@ -124,8 +146,10 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha ...@@ -124,8 +146,10 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
*/ */
def shutdown() { def shutdown() {
logInfo("Shutting down Spark Avro Callback Handler") logInfo("Shutting down Spark Avro Callback Handler")
transactionExecutorOpt.foreach(executor => { sequenceNumberToProcessor.synchronized {
executor.shutdownNow() stopped = true
}) sequenceNumberToProcessor.values().foreach(_.shutdown())
}
transactionExecutorOpt.foreach(_.shutdownNow())
} }
} }
...@@ -60,6 +60,8 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, ...@@ -60,6 +60,8 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
// succeeded. // succeeded.
@volatile private var batchSuccess = false @volatile private var batchSuccess = false
@volatile private var stopped = false
// The transaction that this processor would handle // The transaction that this processor would handle
var txOpt: Option[Transaction] = None var txOpt: Option[Transaction] = None
...@@ -88,6 +90,11 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, ...@@ -88,6 +90,11 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
batchAckLatch.countDown() batchAckLatch.countDown()
} }
private[flume] def shutdown(): Unit = {
logDebug("Shutting down transaction processor")
stopped = true
}
/** /**
* Populates events into the event batch. If the batch cannot be populated, * Populates events into the event batch. If the batch cannot be populated,
* this method will not set the events into the event batch, but it sets an error message. * this method will not set the events into the event batch, but it sets an error message.
...@@ -106,7 +113,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, ...@@ -106,7 +113,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
var gotEventsInThisTxn = false var gotEventsInThisTxn = false
var loopCounter: Int = 0 var loopCounter: Int = 0
loop.breakable { loop.breakable {
while (events.size() < maxBatchSize while (!stopped && events.size() < maxBatchSize
&& loopCounter < totalAttemptsToRemoveFromChannel) { && loopCounter < totalAttemptsToRemoveFromChannel) {
loopCounter += 1 loopCounter += 1
Option(channel.take()) match { Option(channel.take()) match {
...@@ -115,7 +122,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, ...@@ -115,7 +122,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
ByteBuffer.wrap(event.getBody))) ByteBuffer.wrap(event.getBody)))
gotEventsInThisTxn = true gotEventsInThisTxn = true
case None => case None =>
if (!gotEventsInThisTxn) { if (!gotEventsInThisTxn && !stopped) {
logDebug("Sleeping for " + backOffInterval + " millis as no events were read in" + logDebug("Sleeping for " + backOffInterval + " millis as no events were read in" +
" the current transaction") " the current transaction")
TimeUnit.MILLISECONDS.sleep(backOffInterval) TimeUnit.MILLISECONDS.sleep(backOffInterval)
...@@ -125,7 +132,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, ...@@ -125,7 +132,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
} }
} }
} }
if (!gotEventsInThisTxn) { if (!gotEventsInThisTxn && !stopped) {
val msg = "Tried several times, " + val msg = "Tried several times, " +
"but did not get any events from the channel!" "but did not get any events from the channel!"
logWarning(msg) logWarning(msg)
...@@ -136,6 +143,11 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, ...@@ -136,6 +143,11 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
} }
}) })
} catch { } catch {
case interrupted: InterruptedException =>
// Don't pollute logs if the InterruptedException came from this being stopped
if (!stopped) {
logWarning("Error while processing transaction.", interrupted)
}
case e: Exception => case e: Exception =>
logWarning("Error while processing transaction.", e) logWarning("Error while processing transaction.", e)
eventBatch.setErrorMsg(e.getMessage) eventBatch.setErrorMsg(e.getMessage)
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.flume
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import com.google.common.base.Throwables
import org.apache.spark.Logging
import org.apache.spark.streaming.flume.sink._
/**
* This class implements the core functionality of [[FlumePollingReceiver]]. When started it
* pulls data from Flume, stores it to Spark and then sends an Ack or Nack. This class should be
* run via an [[java.util.concurrent.Executor]] as this implements [[Runnable]]
*
* @param receiver The receiver that owns this instance.
*/
private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends Runnable with
Logging {
def run(): Unit = {
while (!receiver.isStopped()) {
val connection = receiver.getConnections.poll()
val client = connection.client
var batchReceived = false
var seq: CharSequence = null
try {
getBatch(client) match {
case Some(eventBatch) =>
batchReceived = true
seq = eventBatch.getSequenceNumber
val events = toSparkFlumeEvents(eventBatch.getEvents)
if (store(events)) {
sendAck(client, seq)
} else {
sendNack(batchReceived, client, seq)
}
case None =>
}
} catch {
case e: Exception =>
Throwables.getRootCause(e) match {
// If the cause was an InterruptedException, then check if the receiver is stopped -
// if yes, just break out of the loop. Else send a Nack and log a warning.
// In the unlikely case, the cause was not an Exception,
// then just throw it out and exit.
case interrupted: InterruptedException =>
if (!receiver.isStopped()) {
logWarning("Interrupted while receiving data from Flume", interrupted)
sendNack(batchReceived, client, seq)
}
case exception: Exception =>
logWarning("Error while receiving data from Flume", exception)
sendNack(batchReceived, client, seq)
}
} finally {
receiver.getConnections.add(connection)
}
}
}
/**
* Gets a batch of events from the specified client. This method does not handle any exceptions
* which will be propogated to the caller.
* @param client Client to get events from
* @return [[Some]] which contains the event batch if Flume sent any events back, else [[None]]
*/
private def getBatch(client: SparkFlumeProtocol.Callback): Option[EventBatch] = {
val eventBatch = client.getEventBatch(receiver.getMaxBatchSize)
if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
// No error, proceed with processing data
logDebug(s"Received batch of ${eventBatch.getEvents.size} events with sequence " +
s"number: ${eventBatch.getSequenceNumber}")
Some(eventBatch)
} else {
logWarning("Did not receive events from Flume agent due to error on the Flume agent: " +
eventBatch.getErrorMsg)
None
}
}
/**
* Store the events in the buffer to Spark. This method will not propogate any exceptions,
* but will propogate any other errors.
* @param buffer The buffer to store
* @return true if the data was stored without any exception being thrown, else false
*/
private def store(buffer: ArrayBuffer[SparkFlumeEvent]): Boolean = {
try {
receiver.store(buffer)
true
} catch {
case e: Exception =>
logWarning("Error while attempting to store data received from Flume", e)
false
}
}
/**
* Send an ack to the client for the sequence number. This method does not handle any exceptions
* which will be propagated to the caller.
* @param client client to send the ack to
* @param seq sequence number of the batch to be ack-ed.
* @return
*/
private def sendAck(client: SparkFlumeProtocol.Callback, seq: CharSequence): Unit = {
logDebug("Sending ack for sequence number: " + seq)
client.ack(seq)
logDebug("Ack sent for sequence number: " + seq)
}
/**
* This method sends a Nack if a batch was received to the client with the given sequence
* number. Any exceptions thrown by the RPC call is simply thrown out as is - no effort is made
* to handle it.
* @param batchReceived true if a batch was received. If this is false, no nack is sent
* @param client The client to which the nack should be sent
* @param seq The sequence number of the batch that is being nack-ed.
*/
private def sendNack(batchReceived: Boolean, client: SparkFlumeProtocol.Callback,
seq: CharSequence): Unit = {
if (batchReceived) {
// Let Flume know that the events need to be pushed back into the channel.
logDebug("Sending nack for sequence number: " + seq)
client.nack(seq) // If the agent is down, even this could fail and throw
logDebug("Nack sent for sequence number: " + seq)
}
}
/**
* Utility method to convert [[SparkSinkEvent]]s to [[SparkFlumeEvent]]s
* @param events - Events to convert to SparkFlumeEvents
* @return - The SparkFlumeEvent generated from SparkSinkEvent
*/
private def toSparkFlumeEvents(events: java.util.List[SparkSinkEvent]):
ArrayBuffer[SparkFlumeEvent] = {
// Convert each Flume event to a serializable SparkFlumeEvent
val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
var j = 0
while (j < events.size()) {
val event = events(j)
val sparkFlumeEvent = new SparkFlumeEvent()
sparkFlumeEvent.event.setBody(event.getBody)
sparkFlumeEvent.event.setHeaders(event.getHeaders)
buffer += sparkFlumeEvent
j += 1
}
buffer
}
}
...@@ -18,10 +18,9 @@ package org.apache.spark.streaming.flume ...@@ -18,10 +18,9 @@ package org.apache.spark.streaming.flume
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, Executors} import java.util.concurrent.{LinkedBlockingQueue, Executors}
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag import scala.reflect.ClassTag
import com.google.common.util.concurrent.ThreadFactoryBuilder import com.google.common.util.concurrent.ThreadFactoryBuilder
...@@ -86,61 +85,9 @@ private[streaming] class FlumePollingReceiver( ...@@ -86,61 +85,9 @@ private[streaming] class FlumePollingReceiver(
connections.add(new FlumeConnection(transceiver, client)) connections.add(new FlumeConnection(transceiver, client))
}) })
for (i <- 0 until parallelism) { for (i <- 0 until parallelism) {
logInfo("Starting Flume Polling Receiver worker threads starting..") logInfo("Starting Flume Polling Receiver worker threads..")
// Threads that pull data from Flume. // Threads that pull data from Flume.
receiverExecutor.submit(new Runnable { receiverExecutor.submit(new FlumeBatchFetcher(this))
override def run(): Unit = {
while (true) {
val connection = connections.poll()
val client = connection.client
try {
val eventBatch = client.getEventBatch(maxBatchSize)
if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
// No error, proceed with processing data
val seq = eventBatch.getSequenceNumber
val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
logDebug(
"Received batch of " + events.size() + " events with sequence number: " + seq)
try {
// Convert each Flume event to a serializable SparkFlumeEvent
val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
var j = 0
while (j < events.size()) {
buffer += toSparkFlumeEvent(events(j))
j += 1
}
store(buffer)
logDebug("Sending ack for sequence number: " + seq)
// Send an ack to Flume so that Flume discards the events from its channels.
client.ack(seq)
logDebug("Ack sent for sequence number: " + seq)
} catch {
case e: Exception =>
try {
// Let Flume know that the events need to be pushed back into the channel.
logDebug("Sending nack for sequence number: " + seq)
client.nack(seq) // If the agent is down, even this could fail and throw
logDebug("Nack sent for sequence number: " + seq)
} catch {
case e: Exception => logError(
"Sending Nack also failed. A Flume agent is down.")
}
TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds.
logWarning("Error while attempting to store events", e)
}
} else {
logWarning("Did not receive events from Flume agent due to error on the Flume " +
"agent: " + eventBatch.getErrorMsg)
}
} catch {
case e: Exception =>
logWarning("Error while reading data from Flume", e)
} finally {
connections.add(connection)
}
}
}
})
} }
} }
...@@ -153,16 +100,12 @@ private[streaming] class FlumePollingReceiver( ...@@ -153,16 +100,12 @@ private[streaming] class FlumePollingReceiver(
channelFactory.releaseExternalResources() channelFactory.releaseExternalResources()
} }
/** private[flume] def getConnections: LinkedBlockingQueue[FlumeConnection] = {
* Utility method to convert [[SparkSinkEvent]] to [[SparkFlumeEvent]] this.connections
* @param event - Event to convert to SparkFlumeEvent }
* @return - The SparkFlumeEvent generated from SparkSinkEvent
*/ private[flume] def getMaxBatchSize: Int = {
private def toSparkFlumeEvent(event: SparkSinkEvent): SparkFlumeEvent = { this.maxBatchSize
val sparkFlumeEvent = new SparkFlumeEvent()
sparkFlumeEvent.event.setBody(event.getBody)
sparkFlumeEvent.event.setHeaders(event.getHeaders)
sparkFlumeEvent
} }
} }
...@@ -171,7 +114,7 @@ private[streaming] class FlumePollingReceiver( ...@@ -171,7 +114,7 @@ private[streaming] class FlumePollingReceiver(
* @param transceiver The transceiver to use for communication with Flume * @param transceiver The transceiver to use for communication with Flume
* @param client The client that the callbacks are received on. * @param client The client that the callbacks are received on.
*/ */
private class FlumeConnection(val transceiver: NettyTransceiver, private[flume] class FlumeConnection(val transceiver: NettyTransceiver,
val client: SparkFlumeProtocol.Callback) val client: SparkFlumeProtocol.Callback)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment