Skip to content
Snippets Groups Projects
Commit bcb5cdad authored by zsxwing's avatar zsxwing Committed by Aaron Davidson
Browse files

[SPARK-3154][STREAMING] Replace ConcurrentHashMap with mutable.HashMap and...

[SPARK-3154][STREAMING] Replace ConcurrentHashMap with mutable.HashMap and remove @volatile from 'stopped'

Since `sequenceNumberToProcessor` and `stopped` are both protected by the lock `sequenceNumberToProcessor`, `ConcurrentHashMap` and `volatile` is unnecessary. So this PR updated them accordingly.

Author: zsxwing <zsxwing@gmail.com>

Closes #3634 from zsxwing/SPARK-3154 and squashes the following commits:

0d087ac [zsxwing] Replace ConcurrentHashMap with mutable.HashMap and remove @volatile from 'stopped'
parent 51b1fe14
No related branches found
No related tags found
No related merge requests found
......@@ -16,10 +16,10 @@
*/
package org.apache.spark.streaming.flume.sink
import java.util.concurrent.{CountDownLatch, ConcurrentHashMap, Executors}
import java.util.concurrent.{CountDownLatch, Executors}
import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConversions._
import scala.collection.mutable
import org.apache.flume.Channel
import org.apache.commons.lang.RandomStringUtils
......@@ -47,8 +47,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Spark Sink Processor Thread - %d").build()))
private val sequenceNumberToProcessor =
new ConcurrentHashMap[CharSequence, TransactionProcessor]()
// Protected by `sequenceNumberToProcessor`
private val sequenceNumberToProcessor = mutable.HashMap[CharSequence, TransactionProcessor]()
// This sink will not persist sequence numbers and reuses them if it gets restarted.
// So it is possible to commit a transaction which may have been meant for the sink before the
// restart.
......@@ -58,8 +58,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
private val seqBase = RandomStringUtils.randomAlphanumeric(8)
private val seqCounter = new AtomicLong(0)
@volatile private var stopped = false
// Protected by `sequenceNumberToProcessor`
private var stopped = false
@volatile private var isTest = false
private var testLatch: CountDownLatch = null
......@@ -131,7 +131,7 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
* @param success Whether the batch was successful or not.
*/
private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) {
Option(removeAndGetProcessor(sequenceNumber)).foreach(processor => {
removeAndGetProcessor(sequenceNumber).foreach(processor => {
processor.batchProcessed(success)
})
}
......@@ -139,10 +139,11 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
/**
* Helper method to remove the TxnProcessor for a Sequence Number. Can be used to avoid a leak.
* @param sequenceNumber
* @return The transaction processor for the corresponding batch. Note that this instance is no
* longer tracked and the caller is responsible for that txn processor.
* @return An `Option` of the transaction processor for the corresponding batch. Note that this
* instance is no longer tracked and the caller is responsible for that txn processor.
*/
private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): TransactionProcessor = {
private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence):
Option[TransactionProcessor] = {
sequenceNumberToProcessor.synchronized {
sequenceNumberToProcessor.remove(sequenceNumber.toString)
}
......@@ -160,7 +161,7 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
logInfo("Shutting down Spark Avro Callback Handler")
sequenceNumberToProcessor.synchronized {
stopped = true
sequenceNumberToProcessor.values().foreach(_.shutdown())
sequenceNumberToProcessor.values.foreach(_.shutdown())
}
transactionExecutorOpt.foreach(_.shutdownNow())
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment