From 6883a5120c6b3a806024dfad03b8bf7371c6c0eb Mon Sep 17 00:00:00 2001
From: Jakob Odersky <jakob@odersky.com>
Date: Fri, 5 Feb 2016 19:00:12 -0800
Subject: [PATCH] [SPARK-13171][CORE] Replace future calls with Future

Trivial search-and-replace to eliminate deprecation warnings in Scala 2.11.
Also works with 2.10

Author: Jakob Odersky <jakob@odersky.com>

Closes #11085 from jodersky/SPARK-13171.
---
 .../spark/deploy/FaultToleranceTest.scala      |  8 ++++----
 .../apache/spark/deploy/worker/Worker.scala    |  2 +-
 .../apache/spark/JobCancellationSuite.scala    | 18 +++++++++---------
 .../ShuffleBlockFetcherIteratorSuite.scala     |  6 +++---
 .../expressions/CodeGenerationSuite.scala      |  2 +-
 .../execution/joins/BroadcastHashJoin.scala    |  2 +-
 .../joins/BroadcastHashOuterJoin.scala         |  2 +-
 .../thriftserver/HiveThriftServer2Suites.scala |  6 +++---
 8 files changed, 23 insertions(+), 23 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index c0ede4b7c8..15d220d01b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -22,7 +22,7 @@ import java.net.URL
 import java.util.concurrent.TimeoutException
 
 import scala.collection.mutable.ListBuffer
-import scala.concurrent.{future, promise, Await}
+import scala.concurrent.{Await, Future, Promise}
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.duration._
 import scala.language.postfixOps
@@ -249,7 +249,7 @@ private object FaultToleranceTest extends App with Logging {
 
   /** This includes Client retry logic, so it may take a while if the cluster is recovering. */
   private def assertUsable() = {
-    val f = future {
+    val f = Future {
       try {
         val res = sc.parallelize(0 until 10).collect()
         assertTrue(res.toList == (0 until 10))
@@ -283,7 +283,7 @@ private object FaultToleranceTest extends App with Logging {
         numAlive == 1 && numStandby == masters.size - 1 && numLiveApps >= 1
     }
 
-    val f = future {
+    val f = Future {
       try {
         while (!stateValid()) {
           Thread.sleep(1000)
@@ -405,7 +405,7 @@ private object SparkDocker {
   }
 
   private def startNode(dockerCmd: ProcessBuilder) : (String, DockerId, File) = {
-    val ipPromise = promise[String]()
+    val ipPromise = Promise[String]()
     val outFile = File.createTempFile("fault-tolerance-test", "", Utils.createTempDir())
     val outStream: FileWriter = new FileWriter(outFile)
     def findIpAndLog(line: String): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 179d3b9f20..df3c286a0a 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -394,7 +394,7 @@ private[deploy] class Worker(
       // rpcEndpoint.
       // Copy ids so that it can be used in the cleanup thread.
       val appIds = executors.values.map(_.appId).toSet
-      val cleanupFuture = concurrent.future {
+      val cleanupFuture = concurrent.Future {
         val appDirs = workDir.listFiles()
         if (appDirs == null) {
           throw new IOException("ERROR: Failed to list files in " + appDirs)
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index e13a442463..c347ab8dc8 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.Semaphore
 import scala.concurrent.Await
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.duration._
-import scala.concurrent.future
+import scala.concurrent.Future
 
 import org.scalatest.BeforeAndAfter
 import org.scalatest.Matchers
@@ -103,7 +103,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
 
     val rdd1 = rdd.map(x => x)
 
-    future {
+    Future {
       taskStartedSemaphore.acquire()
       sc.cancelAllJobs()
       taskCancelledSemaphore.release(100000)
@@ -126,7 +126,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
     })
 
     // jobA is the one to be cancelled.
-    val jobA = future {
+    val jobA = Future {
       sc.setJobGroup("jobA", "this is a job to be cancelled")
       sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()
     }
@@ -191,7 +191,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
     })
 
     // jobA is the one to be cancelled.
-    val jobA = future {
+    val jobA = Future {
       sc.setJobGroup("jobA", "this is a job to be cancelled", interruptOnCancel = true)
       sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(100000); i }.count()
     }
@@ -231,7 +231,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
     val f2 = rdd.countAsync()
 
     // Kill one of the action.
-    future {
+    Future {
       sem1.acquire()
       f1.cancel()
       JobCancellationSuite.twoJobsSharingStageSemaphore.release(10)
@@ -247,7 +247,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
     // Cancel before launching any tasks
     {
       val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync()
-      future { f.cancel() }
+      Future { f.cancel() }
       val e = intercept[SparkException] { f.get() }
       assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
     }
@@ -263,7 +263,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
       })
 
       val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync()
-      future {
+      Future {
         // Wait until some tasks were launched before we cancel the job.
         sem.acquire()
         f.cancel()
@@ -277,7 +277,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
     // Cancel before launching any tasks
     {
       val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000)
-      future { f.cancel() }
+      Future { f.cancel() }
       val e = intercept[SparkException] { f.get() }
       assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
     }
@@ -292,7 +292,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         }
       })
       val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000)
-      future {
+      Future {
         sem.acquire()
         f.cancel()
       }
diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index 828153bdbf..c9c2fb2691 100644
--- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -21,7 +21,7 @@ import java.io.InputStream
 import java.util.concurrent.Semaphore
 
 import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.future
+import scala.concurrent.Future
 
 import org.mockito.Matchers.{any, eq => meq}
 import org.mockito.Mockito._
@@ -149,7 +149,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
       override def answer(invocation: InvocationOnMock): Unit = {
         val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
-        future {
+        Future {
           // Return the first two blocks, and wait till task completion before returning the 3rd one
           listener.onBlockFetchSuccess(
             ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
@@ -211,7 +211,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
       override def answer(invocation: InvocationOnMock): Unit = {
         val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
-        future {
+        Future {
           // Return the first block, and then fail.
           listener.onBlockFetchSuccess(
             ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
index 0c42e2fc7c..b5413fbe2b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
@@ -36,7 +36,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
     import scala.concurrent.duration._
 
     val futures = (1 to 20).map { _ =>
-      future {
+      Future {
         GeneratePredicate.generate(EqualTo(Literal(1), Literal(1)))
         GenerateMutableProjection.generate(EqualTo(Literal(1), Literal(1)) :: Nil)
         GenerateOrdering.generate(Add(Literal(1), Literal(1)).asc :: Nil)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index 8b275e886c..943ad31c0c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -77,7 +77,7 @@ case class BroadcastHashJoin(
 
     // broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
     val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
-    future {
+    Future {
       // This will run in another thread. Set the execution id so that we can connect these jobs
       // with the correct execution.
       SQLExecution.withExecutionId(sparkContext, executionId) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
index db8edd169d..f48fc3b848 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
@@ -76,7 +76,7 @@ case class BroadcastHashOuterJoin(
 
     // broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
     val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
-    future {
+    Future {
       // This will run in another thread. Set the execution id so that we can connect these jobs
       // with the correct execution.
       SQLExecution.withExecutionId(sparkContext, executionId) {
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index ba3b26e1b7..865197e24c 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -23,7 +23,7 @@ import java.sql.{Date, DriverManager, SQLException, Statement}
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{future, Await, ExecutionContext, Promise}
+import scala.concurrent.{Await, ExecutionContext, Future, Promise}
 import scala.concurrent.duration._
 import scala.io.Source
 import scala.util.{Random, Try}
@@ -362,7 +362,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
       try {
         // Start a very-long-running query that will take hours to finish, then cancel it in order
         // to demonstrate that cancellation works.
-        val f = future {
+        val f = Future {
           statement.executeQuery(
             "SELECT COUNT(*) FROM test_map " +
             List.fill(10)("join test_map").mkString(" "))
@@ -380,7 +380,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
         // Cancellation is a no-op if spark.sql.hive.thriftServer.async=false
         statement.executeQuery("SET spark.sql.hive.thriftServer.async=false")
         try {
-          val sf = future {
+          val sf = Future {
             statement.executeQuery(
               "SELECT COUNT(*) FROM test_map " +
                 List.fill(4)("join test_map").mkString(" ")
-- 
GitLab