Skip to content
Snippets Groups Projects
Commit 1c938413 authored by Prashant Sharma's avatar Prashant Sharma Committed by Patrick Wendell
Browse files

SPARK-3962 Marked scope as provided for external projects.

Somehow maven shade plugin is set in infinite loop of creating effective pom.

Author: Prashant Sharma <prashant.s@imaginea.com>
Author: Prashant Sharma <scrapcodes@gmail.com>

Closes #2959 from ScrapCodes/SPARK-3962/scope-provided and squashes the following commits:

994d1d3 [Prashant Sharma] Fixed failing flume tests
270b4fb [Prashant Sharma] Removed most of the unused code.
bb3bbfd [Prashant Sharma] SPARK-3962 Marked scope as provided for external.
parent 0df02ca4
No related branches found
No related tags found
No related merge requests found
Showing
with 264 additions and 48 deletions
...@@ -39,19 +39,13 @@ ...@@ -39,19 +39,13 @@
<groupId>org.apache.spark</groupId> <groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId> <artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.spark</groupId> <groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-sink_${scala.binary.version}</artifactId> <artifactId>spark-streaming-flume-sink_${scala.binary.version}</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.flume</groupId> <groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId> <artifactId>flume-ng-sdk</artifactId>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.junit.After;
import org.junit.Before;
public abstract class LocalJavaStreamingContext {
protected transient JavaStreamingContext ssc;
@Before
public void setUp() {
System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
ssc.checkpoint("checkpoint");
}
@After
public void tearDown() {
ssc.stop();
ssc = null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming
import java.io.{IOException, ObjectInputStream}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ForEachDStream}
import org.apache.spark.util.Utils
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
/**
* This is a output stream just for the testsuites. All the output is collected into a
* ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
*
* The buffer contains a sequence of RDD's, each containing a sequence of items
*/
class TestOutputStream[T: ClassTag](parent: DStream[T],
val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]())
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
output += collected
}) {
// This is to clear the output buffer every it is read from a checkpoint
@throws(classOf[IOException])
private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
ois.defaultReadObject()
output.clear()
}
}
...@@ -20,9 +20,6 @@ package org.apache.spark.streaming.flume ...@@ -20,9 +20,6 @@ package org.apache.spark.streaming.flume
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util.concurrent.{Callable, ExecutorCompletionService, Executors} import java.util.concurrent.{Callable, ExecutorCompletionService, Executors}
import java.util.Random
import org.apache.spark.TestUtils
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
...@@ -32,20 +29,35 @@ import org.apache.flume.channel.MemoryChannel ...@@ -32,20 +29,35 @@ import org.apache.flume.channel.MemoryChannel
import org.apache.flume.conf.Configurables import org.apache.flume.conf.Configurables
import org.apache.flume.event.EventBuilder import org.apache.flume.event.EventBuilder
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.util.ManualClock import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingContext} import org.apache.spark.streaming.{Seconds, TestOutputStream, StreamingContext}
import org.apache.spark.streaming.flume.sink._ import org.apache.spark.streaming.flume.sink._
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
class FlumePollingStreamSuite extends TestSuiteBase { class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging {
val batchCount = 5 val batchCount = 5
val eventsPerBatch = 100 val eventsPerBatch = 100
val totalEventsPerChannel = batchCount * eventsPerBatch val totalEventsPerChannel = batchCount * eventsPerBatch
val channelCapacity = 5000 val channelCapacity = 5000
val maxAttempts = 5 val maxAttempts = 5
val batchDuration = Seconds(1)
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName(this.getClass.getSimpleName)
def beforeFunction() {
logInfo("Using manual clock")
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
}
before(beforeFunction())
test("flume polling test") { test("flume polling test") {
testMultipleTimes(testFlumePolling) testMultipleTimes(testFlumePolling)
...@@ -229,4 +241,5 @@ class FlumePollingStreamSuite extends TestSuiteBase { ...@@ -229,4 +241,5 @@ class FlumePollingStreamSuite extends TestSuiteBase {
null null
} }
} }
} }
...@@ -39,13 +39,7 @@ ...@@ -39,13 +39,7 @@
<groupId>org.apache.spark</groupId> <groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId> <artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> <scope>provided</scope>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.kafka</groupId> <groupId>org.apache.kafka</groupId>
......
...@@ -39,13 +39,7 @@ ...@@ -39,13 +39,7 @@
<groupId>org.apache.spark</groupId> <groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId> <artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> <scope>provided</scope>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.eclipse.paho</groupId> <groupId>org.eclipse.paho</groupId>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.junit.After;
import org.junit.Before;
public abstract class LocalJavaStreamingContext {
protected transient JavaStreamingContext ssc;
@Before
public void setUp() {
System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
ssc.checkpoint("checkpoint");
}
@After
public void tearDown() {
ssc.stop();
ssc = null;
}
}
...@@ -17,11 +17,19 @@ ...@@ -17,11 +17,19 @@
package org.apache.spark.streaming.mqtt package org.apache.spark.streaming.mqtt
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} import org.scalatest.FunSuite
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.dstream.ReceiverInputDStream
class MQTTStreamSuite extends TestSuiteBase { class MQTTStreamSuite extends FunSuite {
val batchDuration = Seconds(1)
private val master: String = "local[2]"
private val framework: String = this.getClass.getSimpleName
test("mqtt input stream") { test("mqtt input stream") {
val ssc = new StreamingContext(master, framework, batchDuration) val ssc = new StreamingContext(master, framework, batchDuration)
......
...@@ -39,13 +39,7 @@ ...@@ -39,13 +39,7 @@
<groupId>org.apache.spark</groupId> <groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId> <artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> <scope>provided</scope>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.twitter4j</groupId> <groupId>org.twitter4j</groupId>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.junit.After;
import org.junit.Before;
public abstract class LocalJavaStreamingContext {
protected transient JavaStreamingContext ssc;
@Before
public void setUp() {
System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
ssc.checkpoint("checkpoint");
}
@After
public void tearDown() {
ssc.stop();
ssc = null;
}
}
...@@ -17,13 +17,23 @@ ...@@ -17,13 +17,23 @@
package org.apache.spark.streaming.twitter package org.apache.spark.streaming.twitter
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel import org.scalatest.{BeforeAndAfter, FunSuite}
import twitter4j.Status
import twitter4j.auth.{NullAuthorization, Authorization} import twitter4j.auth.{NullAuthorization, Authorization}
import org.apache.spark.Logging
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.dstream.ReceiverInputDStream
import twitter4j.Status
class TwitterStreamSuite extends TestSuiteBase { class TwitterStreamSuite extends FunSuite with BeforeAndAfter with Logging {
val batchDuration = Seconds(1)
private val master: String = "local[2]"
private val framework: String = this.getClass.getSimpleName
test("twitter input stream") { test("twitter input stream") {
val ssc = new StreamingContext(master, framework, batchDuration) val ssc = new StreamingContext(master, framework, batchDuration)
......
...@@ -39,13 +39,7 @@ ...@@ -39,13 +39,7 @@
<groupId>org.apache.spark</groupId> <groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId> <artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> <scope>provided</scope>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>${akka.group}</groupId> <groupId>${akka.group}</groupId>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.junit.After;
import org.junit.Before;
public abstract class LocalJavaStreamingContext {
protected transient JavaStreamingContext ssc;
@Before
public void setUp() {
System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
ssc.checkpoint("checkpoint");
}
@After
public void tearDown() {
ssc.stop();
ssc = null;
}
}
...@@ -20,12 +20,19 @@ package org.apache.spark.streaming.zeromq ...@@ -20,12 +20,19 @@ package org.apache.spark.streaming.zeromq
import akka.actor.SupervisorStrategy import akka.actor.SupervisorStrategy
import akka.util.ByteString import akka.util.ByteString
import akka.zeromq.Subscribe import akka.zeromq.Subscribe
import org.scalatest.FunSuite
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.dstream.ReceiverInputDStream
class ZeroMQStreamSuite extends TestSuiteBase { class ZeroMQStreamSuite extends FunSuite {
val batchDuration = Seconds(1)
private val master: String = "local[2]"
private val framework: String = this.getClass.getSimpleName
test("zeromq input stream") { test("zeromq input stream") {
val ssc = new StreamingContext(master, framework, batchDuration) val ssc = new StreamingContext(master, framework, batchDuration)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment