diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java new file mode 100644 index 0000000000000000000000000000000000000000..a94fa621dc328f5f4d396ad287e621943161876a --- /dev/null +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.examples; + +import com.google.common.collect.Lists; + +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.receiver.Receiver; +import scala.Tuple2; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.ConnectException; +import java.net.Socket; +import java.util.regex.Pattern; + +/** + * Custom Receiver that receives data over a socket. Received bytes is interpreted as + * text and \n delimited lines are considered as records. They are then counted and printed. + * + * Usage: JavaCustomReceiver <master> <hostname> <port> + * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. + * <hostname> and <port> of the TCP server that Spark Streaming would connect to receive data. + * + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ ./run org.apache.spark.streaming.examples.JavaCustomReceiver local[2] localhost 9999` + */ + +public class JavaCustomReceiver extends Receiver<String> { + private static final Pattern SPACE = Pattern.compile(" "); + + public static void main(String[] args) { + if (args.length < 3) { + System.err.println("Usage: JavaNetworkWordCount <master> <hostname> <port>\n" + + "In local mode, <master> should be 'local[n]' with n > 1"); + System.exit(1); + } + + StreamingExamples.setStreamingLogLevels(); + + // Create the context with a 1 second batch size + JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount", + new Duration(1000), System.getenv("SPARK_HOME"), + JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class)); + + // Create a input stream with the custom receiver on target ip:port and count the + // words in input stream of \n delimited text (eg. generated by 'nc') + JavaDStream<String> lines = ssc.receiverStream( + new JavaCustomReceiver(args[1], Integer.parseInt(args[2]))); + JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { + @Override + public Iterable<String> call(String x) { + return Lists.newArrayList(SPACE.split(x)); + } + }); + JavaPairDStream<String, Integer> wordCounts = words.mapToPair( + new PairFunction<String, String, Integer>() { + @Override public Tuple2<String, Integer> call(String s) { + return new Tuple2<String, Integer>(s, 1); + } + }).reduceByKey(new Function2<Integer, Integer, Integer>() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + }); + + wordCounts.print(); + ssc.start(); + ssc.awaitTermination(); + } + + // ============= Receiver code that receives data over a socket ============== + + String host = null; + int port = -1; + + public JavaCustomReceiver(String host_ , int port_) { + super(StorageLevel.MEMORY_AND_DISK_2()); + host = host_; + port = port_; + } + + public void onStart() { + // Start the thread that receives data over a connection + new Thread() { + @Override public void run() { + receive(); + } + }.start(); + } + + public void onStop() { + // There is nothing much to do as the thread calling receive() + // is designed to stop by itself isStopped() returns false + } + + /** Create a socket connection and receive data until receiver is stopped */ + private void receive() { + Socket socket = null; + String userInput = null; + + try { + // connect to the server + socket = new Socket(host, port); + + BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); + + // Until stopped or connection broken continue reading + while (!isStopped() && (userInput = reader.readLine()) != null) { + System.out.println("Received data '" + userInput + "'"); + store(userInput); + } + reader.close(); + socket.close(); + + // Restart in an attempt to connect again when server is active again + restart("Trying to connect again"); + } catch(ConnectException ce) { + // restart if could not connect to server + restart("Could not connect", ce); + } catch(Throwable t) { + restart("Error receiving data", t); + } + } +} + + diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java index 7f68d451e9b3197dfc05ab6a108e6a6b79fe6972..0cc9d0ae1a08e3587c4a62d252d40eb2d3eec467 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java @@ -31,7 +31,7 @@ import java.util.regex.Pattern; /** * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. - * Usage: NetworkWordCount <master> <hostname> <port> + * Usage: JavaNetworkWordCount <master> <hostname> <port> * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. * @@ -43,9 +43,6 @@ import java.util.regex.Pattern; public final class JavaNetworkWordCount { private static final Pattern SPACE = Pattern.compile(" "); - private JavaNetworkWordCount() { - } - public static void main(String[] args) { if (args.length < 3) { System.err.println("Usage: JavaNetworkWordCount <master> <hostname> <port>\n" + diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala new file mode 100644 index 0000000000000000000000000000000000000000..eebffd824983ffc03c491154b80fb3fa1549be3b --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.examples + +import java.io.{InputStreamReader, BufferedReader, InputStream} +import java.net.Socket + +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.receiver.Receiver + +/** + * Custom Receiver that receives data over a socket. Received bytes is interpreted as + * text and \n delimited lines are considered as records. They are then counted and printed. + * + * Usage: CustomReceiver <master> <hostname> <port> + * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. + * <hostname> and <port> of the TCP server that Spark Streaming would connect to receive data. + * + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ ./run org.apache.spark.streaming.examples.CustomReceiver local[2] localhost 9999` + */ +object CustomReceiver { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" + + "In local mode, <master> should be 'local[n]' with n > 1") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + // Create the context with a 1 second batch size + val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1), + System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + + // Create a input stream with the custom receiver on target ip:port and count the + // words in input stream of \n delimited text (eg. generated by 'nc') + val lines = ssc.receiverStream(new CustomReceiver(args(1), args(2).toInt)) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.print() + ssc.start() + ssc.awaitTermination() + } +} + + +class CustomReceiver(host: String, port: Int) + extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging { + + def onStart() { + // Start the thread that receives data over a connection + new Thread("Socket Receiver") { + override def run() { receive() } + }.start() + } + + def onStop() { + // There is nothing much to do as the thread calling receive() + // is designed to stop by itself isStopped() returns false + } + + /** Create a socket connection and receive data until receiver is stopped */ + private def receive() { + var socket: Socket = null + var userInput: String = null + try { + logInfo("Connecting to " + host + ":" + port) + socket = new Socket(host, port) + logInfo("Connected to " + host + ":" + port) + val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")) + userInput = reader.readLine() + while(!isStopped && userInput != null) { + store(userInput) + userInput = reader.readLine() + } + reader.close() + socket.close() + logInfo("Stopped receiving") + restart("Trying to connect again") + } catch { + case e: java.net.ConnectException => + restart("Error connecting to " + host + ":" + port, e) + case t: Throwable => + restart("Error receiving data", t) + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index fbb2e9f85dd12c49ead472cfb8a3d7535ce9672f..75a3e9334e6d59fd1d2aaceff84a820b037c9115 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -390,7 +390,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html * @param receiver Custom implementation of Receiver */ - def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T] = { + def receiverStream[T](receiver: Receiver[T]): JavaReceiverInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] ssc.receiverStream(receiver) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala index 524c1b8d8ce46efa6c8a49b8907568d2b4cbe9e3..b310c22b3ab782ca2793a66ad2134bc6876782e8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala @@ -30,28 +30,55 @@ import org.apache.spark.annotation.DeveloperApi * Abstract class of a receiver that can be run on worker nodes to receive external data. A * custom receiver can be defined by defining the functions onStart() and onStop(). onStart() * should define the setup steps necessary to start receiving data, - * and onStop() should define the cleanup steps necessary to stop receiving data. A custom - * receiver would look something like this. + * and onStop() should define the cleanup steps necessary to stop receiving data. * - * @example {{{ + * A custom receiver in Scala would look like this. + * + * {{{ * class MyReceiver(storageLevel: StorageLevel) extends NetworkReceiver[String](storageLevel) { - * def onStart() { - * // Setup stuff (start threads, open sockets, etc.) to start receiving data. - * // Must start new thread to receive data, as onStart() must be non-blocking. + * def onStart() { + * // Setup stuff (start threads, open sockets, etc.) to start receiving data. + * // Must start new thread to receive data, as onStart() must be non-blocking. * - * // Call store(...) in those threads to store received data into Spark's memory. + * // Call store(...) in those threads to store received data into Spark's memory. * - * // Call stop(...), restart() or reportError(...) on any thread based on how - * // different errors should be handled. + * // Call stop(...), restart(...) or reportError(...) on any thread based on how + * // different errors needs to be handled. * - * // See corresponding method documentation for more details - * } + * // See corresponding method documentation for more details + * } * - * def onStop() { - * // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data. - * } + * def onStop() { + * // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data. + * } * } * }}} + * + * A custom receiver in Java would look like this. + * + * {{{ + * class MyReceiver extends Receiver<String> { + * public MyReceiver(StorageLevel storageLevel) { + * super(storageLevel); + * } + * + * public void onStart() { + * // Setup stuff (start threads, open sockets, etc.) to start receiving data. + * // Must start new thread to receive data, as onStart() must be non-blocking. + * + * // Call store(...) in those threads to store received data into Spark's memory. + * + * // Call stop(...), restart(...) or reportError(...) on any thread based on how + * // different errors needs to be handled. + * + * // See corresponding method documentation for more details + * } + * + * public void onStop() { + * // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data. + * } + * } + * }}} */ @DeveloperApi abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable { @@ -156,30 +183,34 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable } /** - * Restart the receiver. This will call `onStop()` immediately and return. - * Asynchronously, after a delay, `onStart()` will be called. + * Restart the receiver. This method schedules the restart and returns + * immediately. The stopping and subsequent starting of the receiver + * (by calling `onStop()` and `onStart()`) is performed asynchronously + * in a background thread. The delay between the stopping and the starting + * is defined by the Spark configuration `spark.streaming.receiverRestartDelay`. * The `message` will be reported to the driver. - * The delay is defined by the Spark configuration - * `spark.streaming.receiverRestartDelay`. */ def restart(message: String) { executor.restartReceiver(message) } /** - * Restart the receiver. This will call `onStop()` immediately and return. - * Asynchronously, after a delay, `onStart()` will be called. + * Restart the receiver. This method schedules the restart and returns + * immediately. The stopping and subsequent starting of the receiver + * (by calling `onStop()` and `onStart()`) is performed asynchronously + * in a background thread. The delay between the stopping and the starting + * is defined by the Spark configuration `spark.streaming.receiverRestartDelay`. * The `message` and `exception` will be reported to the driver. - * The delay is defined by the Spark configuration - * `spark.streaming.receiverRestartDelay`. */ def restart(message: String, error: Throwable) { executor.restartReceiver(message, Some(error)) } /** - * Restart the receiver. This will call `onStop()` immediately and return. - * Asynchronously, after the given delay, `onStart()` will be called. + * Restart the receiver. This method schedules the restart and returns + * immediately. The stopping and subsequent starting of the receiver + * (by calling `onStop()` and `onStart()`) is performed asynchronously + * in a background thread. */ def restart(message: String, error: Throwable, millisecond: Int) { executor.restartReceiver(message, Some(error), millisecond) @@ -195,16 +226,23 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable executor.stop(message, Some(error)) } + /** Check if the receiver has started or not. */ def isStarted(): Boolean = { executor.isReceiverStarted() } - /** Check if receiver has been marked for stopping. */ + /** + * Check if receiver has been marked for stopping. Use this to identify when + * the receiving of data should be stopped. + */ def isStopped(): Boolean = { executor.isReceiverStopped() } - /** Get unique identifier of this receiver. */ + /** + * Get the unique identifier the receiver input stream that this + * receiver is associated with. + */ def streamId = id /* diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java new file mode 100644 index 0000000000000000000000000000000000000000..1b0787fe69dec3314a2fe643b7d530fd5e4a976b --- /dev/null +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import static org.junit.Assert.*; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.receiver.Receiver; +import org.apache.spark.api.java.function.Function; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.Serializable; +import java.net.ConnectException; +import java.net.Socket; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class JavaReceiverAPISuite implements Serializable { + + @Before + public void setUp() { + System.clearProperty("spark.streaming.clock"); + } + + @After + public void tearDown() { + System.clearProperty("spark.streaming.clock"); + } + + @Test + public void testReceiver() throws InterruptedException { + TestServer server = new TestServer(0); + server.start(); + + final AtomicLong dataCounter = new AtomicLong(0); + + try { + JavaStreamingContext ssc = new JavaStreamingContext("local[2]", "test", new Duration(200)); + JavaReceiverInputDStream<String> input = + ssc.receiverStream(new JavaSocketReceiver("localhost", server.port())); + JavaDStream<String> mapped = input.map(new Function<String, String>() { + @Override + public String call(String v1) throws Exception { + return v1 + "."; + } + }); + mapped.foreachRDD(new Function<JavaRDD<String>, Void>() { + @Override + public Void call(JavaRDD<String> rdd) throws Exception { + long count = rdd.count(); + dataCounter.addAndGet(count); + return null; + } + }); + + ssc.start(); + long startTime = System.currentTimeMillis(); + long timeout = 10000; + + Thread.sleep(200); + for (int i = 0; i < 6; i++) { + server.send("" + i + "\n"); // \n to make sure these are separate lines + Thread.sleep(100); + } + while (dataCounter.get() == 0 && System.currentTimeMillis() - startTime < timeout) { + Thread.sleep(100); + } + ssc.stop(); + assertTrue(dataCounter.get() > 0); + } finally { + server.stop(); + } + } +} + +class JavaSocketReceiver extends Receiver<String> { + + String host = null; + int port = -1; + + public JavaSocketReceiver(String host_ , int port_) { + super(StorageLevel.MEMORY_AND_DISK()); + host = host_; + port = port_; + } + + @Override + public void onStart() { + new Thread() { + @Override public void run() { + receive(); + } + }.start(); + } + + @Override + public void onStop() { + } + + private void receive() { + Socket socket = null; + try { + socket = new Socket(host, port); + BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + String userInput; + while ((userInput = in.readLine()) != null) { + store(userInput); + } + in.close(); + socket.close(); + } catch(ConnectException ce) { + ce.printStackTrace(); + restart("Could not connect", ce); + } catch(Throwable t) { + t.printStackTrace(); + restart("Error receiving data", t); + } + } +} + diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index 33f6df8f881776665af300753e8da3714cf0ee79..c0ea0491c313dc681bcafe35a2ac41d0eeea5a6a 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -26,6 +26,7 @@ import org.apache.spark.streaming._ import java.util.ArrayList import collection.JavaConversions._ import org.apache.spark.api.java.JavaRDDLike +import org.apache.spark.streaming.dstream.DStream /** Exposes streaming test functionality in a Java-friendly way. */ trait JavaTestBase extends TestSuiteBase { @@ -51,8 +52,7 @@ trait JavaTestBase extends TestSuiteBase { * [[org.apache.spark.streaming.TestOutputStream]]. **/ def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]( - dstream: JavaDStreamLike[T, This, R]) = - { + dstream: JavaDStreamLike[T, This, R]) = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val ostream = new TestOutputStreamWithPartitions(dstream.dstream) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index b55b7834c90c10ca254d1d1e6c6c2604950a58ec..3fa254065cc447bab9f3ba4b411d5a3cdcc39975 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -49,7 +49,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) - val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) + val networkStream = ssc.socketTextStream( + "localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] val outputStream = new TestOutputStream(networkStream, outputBuffer) def output = outputBuffer.flatMap(x => x) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 3e2b25af84098e949b522cac6cc9026a6dea87be..ee0bc8b7d6a71d4ecede13e5f0fe1297103c173b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -165,7 +165,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ssc = new StreamingContext(sc, Milliseconds(100)) var runningCount = 0 TestReceiver.counter.set(1) - val input = ssc.networkStream(new TestReceiver) + val input = ssc.receiverStream(new TestReceiver) input.count.foreachRDD(rdd => { val count = rdd.first() runningCount += count.toInt diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 4f63fd37822cb1e0ecb8fb2580e97305e2217e01..8036f77c973ae81fadc9ae8edf355b6c60a34e40 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -155,6 +155,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { def afterFunction() { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.streaming.clock") } before(beforeFunction)