diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml index 5289073eb457a0632736b0a5b3d5c3cd44303973..c242e7a57b9ab6bf7258ec1dfa7290f4b05fed33 100644 --- a/extras/kinesis-asl/pom.xml +++ b/extras/kinesis-asl/pom.xml @@ -40,6 +40,13 @@ <artifactId>spark-streaming_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.binary.version}</artifactId> diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala index 6c262624833cd8830707a8d2c795700888f503df..2103dca6b766f2d99e7af39d78b2a3d8db123242 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala @@ -26,23 +26,18 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason import com.amazonaws.services.kinesis.model.Record import org.mockito.Mockito._ -// scalastyle:off -// To avoid introducing a dependency on Spark core tests, simply use scalatest's FunSuite -// here instead of our own SparkFunSuite. Introducing the dependency has caused problems -// in the past (SPARK-8781) that are complicated by bugs in the maven shade plugin (MSHADE-148). -import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} +import org.scalatest.{BeforeAndAfter, Matchers} import org.scalatest.mock.MockitoSugar import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext} +import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext, TestSuiteBase} import org.apache.spark.util.{Clock, ManualClock, Utils} /** * Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor */ -class KinesisReceiverSuite extends FunSuite with Matchers with BeforeAndAfter - with MockitoSugar { -// scalastyle:on +class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAfter + with MockitoSugar { val app = "TestKinesisReceiver" val stream = "mySparkStream" @@ -62,7 +57,7 @@ class KinesisReceiverSuite extends FunSuite with Matchers with BeforeAndAfter var checkpointStateMock: KinesisCheckpointState = _ var currentClockMock: Clock = _ - before { + override def beforeFunction(): Unit = { receiverMock = mock[KinesisReceiver] checkpointerMock = mock[IRecordProcessorCheckpointer] checkpointClockMock = mock[ManualClock] @@ -70,7 +65,8 @@ class KinesisReceiverSuite extends FunSuite with Matchers with BeforeAndAfter currentClockMock = mock[Clock] } - after { + override def afterFunction(): Unit = { + super.afterFunction() // Since this suite was originally written using EasyMock, add this to preserve the old // mocking semantics (see SPARK-5735 for more details) verifyNoMoreInteractions(receiverMock, checkpointerMock, checkpointClockMock, @@ -78,7 +74,7 @@ class KinesisReceiverSuite extends FunSuite with Matchers with BeforeAndAfter } test("KinesisUtils API") { - val ssc = new StreamingContext("local[2]", getClass.getSimpleName, Seconds(1)) + val ssc = new StreamingContext(master, framework, batchDuration) // Tests the API, does not actually test data receiving val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream", "https://kinesis.us-west-2.amazonaws.com", Seconds(2),