Skip to content
Snippets Groups Projects
Unverified Commit a94659ce authored by Sean Owen's avatar Sean Owen
Browse files

[SPARK-18400][STREAMING] NPE when resharding Kinesis Stream


## What changes were proposed in this pull request?

Avoid NPE in KinesisRecordProcessor when shutdown happens without successful init

## How was this patch tested?

Existing tests

Author: Sean Owen <sowen@cloudera.com>

Closes #15882 from srowen/SPARK-18400.

(cherry picked from commit 43a26899)
Signed-off-by: default avatarSean Owen <sowen@cloudera.com>
parent 4567db9d
No related branches found
No related tags found
No related merge requests found
......@@ -27,7 +27,6 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
import com.amazonaws.services.kinesis.model.Record
import org.apache.spark.internal.Logging
import org.apache.spark.streaming.Duration
/**
* Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor.
......@@ -102,27 +101,32 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
* @param checkpointer used to perform a Kinesis checkpoint for ShutdownReason.TERMINATE
* @param reason for shutdown (ShutdownReason.TERMINATE or ShutdownReason.ZOMBIE)
*/
override def shutdown(checkpointer: IRecordProcessorCheckpointer, reason: ShutdownReason) {
override def shutdown(
checkpointer: IRecordProcessorCheckpointer,
reason: ShutdownReason): Unit = {
logInfo(s"Shutdown: Shutting down workerId $workerId with reason $reason")
reason match {
/*
* TERMINATE Use Case. Checkpoint.
* Checkpoint to indicate that all records from the shard have been drained and processed.
* It's now OK to read from the new shards that resulted from a resharding event.
*/
case ShutdownReason.TERMINATE =>
receiver.removeCheckpointer(shardId, checkpointer)
// null if not initialized before shutdown:
if (shardId == null) {
logWarning(s"No shardId for workerId $workerId?")
} else {
reason match {
/*
* TERMINATE Use Case. Checkpoint.
* Checkpoint to indicate that all records from the shard have been drained and processed.
* It's now OK to read from the new shards that resulted from a resharding event.
*/
case ShutdownReason.TERMINATE => receiver.removeCheckpointer(shardId, checkpointer)
/*
* ZOMBIE Use Case or Unknown reason. NoOp.
* No checkpoint because other workers may have taken over and already started processing
* the same records.
* This may lead to records being processed more than once.
*/
case _ =>
receiver.removeCheckpointer(shardId, null) // return null so that we don't checkpoint
/*
* ZOMBIE Use Case or Unknown reason. NoOp.
* No checkpoint because other workers may have taken over and already started processing
* the same records.
* This may lead to records being processed more than once.
* Return null so that we don't checkpoint
*/
case _ => receiver.removeCheckpointer(shardId, null)
}
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment