Skip to content
Snippets Groups Projects
Unverified Commit 43a26899 authored by Sean Owen's avatar Sean Owen
Browse files

[SPARK-18400][STREAMING] NPE when resharding Kinesis Stream

## What changes were proposed in this pull request?

Avoid NPE in KinesisRecordProcessor when shutdown happens without successful init

## How was this patch tested?

Existing tests

Author: Sean Owen <sowen@cloudera.com>

Closes #15882 from srowen/SPARK-18400.
parent 3e01f128
No related branches found
No related tags found
No related merge requests found
...@@ -27,7 +27,6 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason ...@@ -27,7 +27,6 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
import com.amazonaws.services.kinesis.model.Record import com.amazonaws.services.kinesis.model.Record
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.streaming.Duration
/** /**
* Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor. * Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor.
...@@ -102,27 +101,32 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w ...@@ -102,27 +101,32 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
* @param checkpointer used to perform a Kinesis checkpoint for ShutdownReason.TERMINATE * @param checkpointer used to perform a Kinesis checkpoint for ShutdownReason.TERMINATE
* @param reason for shutdown (ShutdownReason.TERMINATE or ShutdownReason.ZOMBIE) * @param reason for shutdown (ShutdownReason.TERMINATE or ShutdownReason.ZOMBIE)
*/ */
override def shutdown(checkpointer: IRecordProcessorCheckpointer, reason: ShutdownReason) { override def shutdown(
checkpointer: IRecordProcessorCheckpointer,
reason: ShutdownReason): Unit = {
logInfo(s"Shutdown: Shutting down workerId $workerId with reason $reason") logInfo(s"Shutdown: Shutting down workerId $workerId with reason $reason")
reason match { // null if not initialized before shutdown:
/* if (shardId == null) {
* TERMINATE Use Case. Checkpoint. logWarning(s"No shardId for workerId $workerId?")
* Checkpoint to indicate that all records from the shard have been drained and processed. } else {
* It's now OK to read from the new shards that resulted from a resharding event. reason match {
*/ /*
case ShutdownReason.TERMINATE => * TERMINATE Use Case. Checkpoint.
receiver.removeCheckpointer(shardId, checkpointer) * Checkpoint to indicate that all records from the shard have been drained and processed.
* It's now OK to read from the new shards that resulted from a resharding event.
*/
case ShutdownReason.TERMINATE => receiver.removeCheckpointer(shardId, checkpointer)
/* /*
* ZOMBIE Use Case or Unknown reason. NoOp. * ZOMBIE Use Case or Unknown reason. NoOp.
* No checkpoint because other workers may have taken over and already started processing * No checkpoint because other workers may have taken over and already started processing
* the same records. * the same records.
* This may lead to records being processed more than once. * This may lead to records being processed more than once.
*/ * Return null so that we don't checkpoint
case _ => */
receiver.removeCheckpointer(shardId, null) // return null so that we don't checkpoint case _ => receiver.removeCheckpointer(shardId, null)
}
} }
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment