From 1faa1135a3fc0acd89f934f01a4a2edefcb93d33 Mon Sep 17 00:00:00 2001
From: Patrick Wendell <pwendell@gmail.com>
Date: Thu, 9 Oct 2014 14:50:36 -0700
Subject: [PATCH] Revert "[SPARK-2805] Upgrade to akka 2.3.4"

This reverts commit b9df8af62e8d7b263a668dfb6e9668ab4294ea37.
---
 .../org/apache/spark/deploy/Client.scala      |  2 +-
 .../spark/deploy/client/AppClient.scala       |  2 +-
 .../spark/deploy/worker/WorkerWatcher.scala   |  2 +-
 .../apache/spark/MapOutputTrackerSuite.scala  |  4 +-
 pom.xml                                       |  2 +-
 .../spark/streaming/InputStreamsSuite.scala   | 71 +++++++++++++++++++
 6 files changed, 77 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index f2687ce6b4..065ddda50e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -130,7 +130,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
       println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
       System.exit(-1)
 
-    case AssociationErrorEvent(cause, _, remoteAddress, _, _) =>
+    case AssociationErrorEvent(cause, _, remoteAddress, _) =>
       println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
       println(s"Cause was: $cause")
       System.exit(-1)
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 98a93d1fcb..32790053a6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -154,7 +154,7 @@ private[spark] class AppClient(
         logWarning(s"Connection to $address failed; waiting for master to reconnect...")
         markDisconnected()
 
-      case AssociationErrorEvent(cause, _, address, _, _) if isPossibleMaster(address) =>
+      case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) =>
         logWarning(s"Could not connect to $address: $cause")
 
       case StopAppClient =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
index 63a8ac817b..6d0d0bbe5e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -54,7 +54,7 @@ private[spark] class WorkerWatcher(workerUrl: String)
     case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) =>
       logInfo(s"Successfully connected to $workerUrl")
 
-    case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound, _)
+    case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound)
         if isWorker(remoteAddress) =>
       // These logs may not be seen if the worker (and associated pipe) has died
       logError(s"Could not initialize connection to worker $workerUrl. Exiting.")
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index cbc0bd178d..1fef79ad10 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -146,7 +146,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
     val masterTracker = new MapOutputTrackerMaster(conf)
     val actorSystem = ActorSystem("test")
     val actorRef = TestActorRef[MapOutputTrackerMasterActor](
-      Props(new MapOutputTrackerMasterActor(masterTracker, newConf)))(actorSystem)
+      new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
     val masterActor = actorRef.underlyingActor
 
     // Frame size should be ~123B, and no exception should be thrown
@@ -164,7 +164,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
     val masterTracker = new MapOutputTrackerMaster(conf)
     val actorSystem = ActorSystem("test")
     val actorRef = TestActorRef[MapOutputTrackerMasterActor](
-      Props(new MapOutputTrackerMasterActor(masterTracker, newConf)))(actorSystem)
+      new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
     val masterActor = actorRef.underlyingActor
 
     // Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception.
diff --git a/pom.xml b/pom.xml
index 3b6d4ecbae..7756c89b00 100644
--- a/pom.xml
+++ b/pom.xml
@@ -118,7 +118,7 @@
     <mesos.version>0.18.1</mesos.version>
     <mesos.classifier>shaded-protobuf</mesos.classifier>
     <akka.group>org.spark-project.akka</akka.group>
-    <akka.version>2.3.4-spark</akka.version>
+    <akka.version>2.2.3-shaded-protobuf</akka.version>
     <slf4j.version>1.7.5</slf4j.version>
     <log4j.version>1.2.17</log4j.version>
     <hadoop.version>1.0.4</hadoop.version>
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 6107fcdc44..952a74fd5f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.streaming
 
 import akka.actor.Actor
+import akka.actor.IO
+import akka.actor.IOManager
 import akka.actor.Props
 import akka.util.ByteString
 
@@ -142,6 +144,59 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
   }
 
+  // TODO: This test works in IntelliJ but not through SBT
+  ignore("actor input stream") {
+    // Start the server
+    val testServer = new TestServer()
+    val port = testServer.port
+    testServer.start()
+
+    // Set up the streaming context and input streams
+    val ssc = new StreamingContext(conf, batchDuration)
+    val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor",
+      // Had to pass the local value of port to prevent from closing over entire scope
+      StorageLevel.MEMORY_AND_DISK)
+    val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
+    val outputStream = new TestOutputStream(networkStream, outputBuffer)
+    def output = outputBuffer.flatMap(x => x)
+    outputStream.register()
+    ssc.start()
+
+    // Feed data to the server to send to the network receiver
+    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+    val input = 1 to 9
+    val expectedOutput = input.map(x => x.toString)
+    Thread.sleep(1000)
+    for (i <- 0 until input.size) {
+      testServer.send(input(i).toString)
+      Thread.sleep(500)
+      clock.addToTime(batchDuration.milliseconds)
+    }
+    Thread.sleep(1000)
+    logInfo("Stopping server")
+    testServer.stop()
+    logInfo("Stopping context")
+    ssc.stop()
+
+    // Verify whether data received was as expected
+    logInfo("--------------------------------")
+    logInfo("output.size = " + outputBuffer.size)
+    logInfo("output")
+    outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+    logInfo("expected output.size = " + expectedOutput.size)
+    logInfo("expected output")
+    expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+    logInfo("--------------------------------")
+
+    // Verify whether all the elements received are as expected
+    // (whether the elements were received one in each interval is not verified)
+    assert(output.size === expectedOutput.size)
+    for (i <- 0 until output.size) {
+      assert(output(i) === expectedOutput(i))
+    }
+  }
+
+
   test("multi-thread receiver") {
     // set up the test receiver
     val numThreads = 10
@@ -323,6 +378,22 @@ class TestServer(portToBind: Int = 0) extends Logging {
   def port = serverSocket.getLocalPort
 }
 
+/** This is an actor for testing actor input stream */
+class TestActor(port: Int) extends Actor with ActorHelper {
+
+  def bytesToString(byteString: ByteString) = byteString.utf8String
+
+  override def preStart(): Unit = {
+    @deprecated("suppress compile time deprecation warning", "1.0.0")
+    val unit = IOManager(context.system).connect(new InetSocketAddress(port))
+  }
+
+  def receive = {
+    case IO.Read(socket, bytes) =>
+      store(bytesToString(bytes))
+  }
+}
+
 /** This is a receiver to test multiple threads inserting data using block generator */
 class MultiThreadTestReceiver(numThreads: Int, numRecordsPerThread: Int)
   extends Receiver[Int](StorageLevel.MEMORY_ONLY_SER) with Logging {
-- 
GitLab