diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index 0c68defa5e101092dae3cb39bdc47bc9fa982273..19192e40a7dc31506414c58b8ae4ec8c1aff34c8 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -62,7 +62,7 @@ <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> </exclusion> - </exclusions> + </exclusions> </dependency> <dependency> <groupId>org.scala-lang</groupId> @@ -71,13 +71,19 @@ <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.binary.version}</artifactId> + <scope>test</scope> </dependency> <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> <!-- Need it only for tests, don't package it --> + <!-- + Netty explicitly added in test as it has been excluded from + Flume dependency (to avoid runtime problems when running with + Spark) but unit tests need it. Version of Netty on which + Flume 1.4.0 depends on is "3.4.0.Final" . + --> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + <version>3.4.0.Final</version> + <scope>test</scope> </dependency> </dependencies> <build> diff --git a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala index 44b27edf85ce87d94ceb34e9a44b28726005fdd0..75a6668c6210bd1bdd26fe6dfacca9a620bdc392 100644 --- a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala +++ b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala @@ -30,14 +30,14 @@ import org.apache.avro.ipc.specific.SpecificRequestor import org.apache.flume.Context import org.apache.flume.channel.MemoryChannel import org.apache.flume.event.EventBuilder -import org.apache.spark.streaming.TestSuiteBase import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory +import org.scalatest.FunSuite -class SparkSinkSuite extends TestSuiteBase { +class SparkSinkSuite extends FunSuite { val eventsPerBatch = 1000 val channelCapacity = 5000 - test("Success") { + test("Success with ack") { val (channel, sink) = initializeChannelAndSink() channel.start() sink.start() @@ -57,7 +57,7 @@ class SparkSinkSuite extends TestSuiteBase { transceiver.close() } - test("Nack") { + test("Failure with nack") { val (channel, sink) = initializeChannelAndSink() channel.start() sink.start() @@ -76,7 +76,7 @@ class SparkSinkSuite extends TestSuiteBase { transceiver.close() } - test("Timeout") { + test("Failure with timeout") { val (channel, sink) = initializeChannelAndSink(Map(SparkSinkConfig .CONF_TRANSACTION_TIMEOUT -> 1.toString)) channel.start()