Skip to content
Snippets Groups Projects
Commit 4b7ff309 authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-7787] [STREAMING] Fix serialization issue of SerializableAWSCredentials

Lack of default constructor causes deserialization to fail. This occurs only when the AWS credentials are explicitly specified through KinesisUtils.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #6316 from tdas/SPARK-7787 and squashes the following commits:

248ca5c [Tathagata Das] Fixed serializability
parent 8730fbb4
No related branches found
No related tags found
No related merge requests found
......@@ -31,7 +31,10 @@ import org.apache.spark.util.Utils
private[kinesis]
case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
extends BasicAWSCredentials(accessKeyId, secretKey) with Serializable
extends AWSCredentials {
override def getAWSAccessKeyId: String = accessKeyId
override def getAWSSecretKey: String = secretKey
}
/**
* Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.
......
......@@ -20,27 +20,18 @@ import java.nio.ByteBuffer
import scala.collection.JavaConversions.seqAsJavaList
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Milliseconds
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.TestSuiteBase
import org.apache.spark.util.{ManualClock, Clock}
import org.mockito.Mockito._
import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers
import org.scalatest.mock.MockitoSugar
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
import com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
import com.amazonaws.services.kinesis.model.Record
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.mock.MockitoSugar
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext, TestSuiteBase}
import org.apache.spark.util.{Clock, ManualClock, Utils}
/**
* Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor
......@@ -99,6 +90,11 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
ssc.stop()
}
test("check serializability of SerializableAWSCredentials") {
Utils.deserialize[SerializableAWSCredentials](
Utils.serialize(new SerializableAWSCredentials("x", "y")))
}
test("process records including store and checkpoint") {
when(receiverMock.isStopped()).thenReturn(false)
when(checkpointStateMock.shouldCheckpoint()).thenReturn(true)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment