diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index c0ede4b7c8734971b9451e87a24fb7538cf73c43..15d220d01b461bdf50f76cbb203cd18ae2b3f9ba 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -22,7 +22,7 @@ import java.net.URL import java.util.concurrent.TimeoutException import scala.collection.mutable.ListBuffer -import scala.concurrent.{future, promise, Await} +import scala.concurrent.{Await, Future, Promise} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.language.postfixOps @@ -249,7 +249,7 @@ private object FaultToleranceTest extends App with Logging { /** This includes Client retry logic, so it may take a while if the cluster is recovering. */ private def assertUsable() = { - val f = future { + val f = Future { try { val res = sc.parallelize(0 until 10).collect() assertTrue(res.toList == (0 until 10)) @@ -283,7 +283,7 @@ private object FaultToleranceTest extends App with Logging { numAlive == 1 && numStandby == masters.size - 1 && numLiveApps >= 1 } - val f = future { + val f = Future { try { while (!stateValid()) { Thread.sleep(1000) @@ -405,7 +405,7 @@ private object SparkDocker { } private def startNode(dockerCmd: ProcessBuilder) : (String, DockerId, File) = { - val ipPromise = promise[String]() + val ipPromise = Promise[String]() val outFile = File.createTempFile("fault-tolerance-test", "", Utils.createTempDir()) val outStream: FileWriter = new FileWriter(outFile) def findIpAndLog(line: String): Unit = { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 179d3b9f20b1f7cc0a68680981f38b1ef6bf7048..df3c286a0a66f77c773e6f1c071f05cb90260e60 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -394,7 +394,7 @@ private[deploy] class Worker( // rpcEndpoint. // Copy ids so that it can be used in the cleanup thread. val appIds = executors.values.map(_.appId).toSet - val cleanupFuture = concurrent.future { + val cleanupFuture = concurrent.Future { val appDirs = workDir.listFiles() if (appDirs == null) { throw new IOException("ERROR: Failed to list files in " + appDirs) diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index e13a442463e8d1be2d0ea45d8e2d080a32f9676f..c347ab8dc8020fd8e64be01a48b38b71613dd786 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -22,7 +22,7 @@ import java.util.concurrent.Semaphore import scala.concurrent.Await import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ -import scala.concurrent.future +import scala.concurrent.Future import org.scalatest.BeforeAndAfter import org.scalatest.Matchers @@ -103,7 +103,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft val rdd1 = rdd.map(x => x) - future { + Future { taskStartedSemaphore.acquire() sc.cancelAllJobs() taskCancelledSemaphore.release(100000) @@ -126,7 +126,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft }) // jobA is the one to be cancelled. - val jobA = future { + val jobA = Future { sc.setJobGroup("jobA", "this is a job to be cancelled") sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count() } @@ -191,7 +191,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft }) // jobA is the one to be cancelled. - val jobA = future { + val jobA = Future { sc.setJobGroup("jobA", "this is a job to be cancelled", interruptOnCancel = true) sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(100000); i }.count() } @@ -231,7 +231,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft val f2 = rdd.countAsync() // Kill one of the action. - future { + Future { sem1.acquire() f1.cancel() JobCancellationSuite.twoJobsSharingStageSemaphore.release(10) @@ -247,7 +247,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft // Cancel before launching any tasks { val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync() - future { f.cancel() } + Future { f.cancel() } val e = intercept[SparkException] { f.get() } assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed")) } @@ -263,7 +263,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft }) val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync() - future { + Future { // Wait until some tasks were launched before we cancel the job. sem.acquire() f.cancel() @@ -277,7 +277,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft // Cancel before launching any tasks { val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000) - future { f.cancel() } + Future { f.cancel() } val e = intercept[SparkException] { f.get() } assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed")) } @@ -292,7 +292,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft } }) val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000) - future { + Future { sem.acquire() f.cancel() } diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index 828153bdbfc44cfd338c2f35fcb3e10187120049..c9c2fb2691d706b2bd5e305ea57145a3168be9c5 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -21,7 +21,7 @@ import java.io.InputStream import java.util.concurrent.Semaphore import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.future +import scala.concurrent.Future import org.mockito.Matchers.{any, eq => meq} import org.mockito.Mockito._ @@ -149,7 +149,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] { override def answer(invocation: InvocationOnMock): Unit = { val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener] - future { + Future { // Return the first two blocks, and wait till task completion before returning the 3rd one listener.onBlockFetchSuccess( ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0))) @@ -211,7 +211,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] { override def answer(invocation: InvocationOnMock): Unit = { val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener] - future { + Future { // Return the first block, and then fail. listener.onBlockFetchSuccess( ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0))) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala index 0c42e2fc7c5e5b0e81dc0b702c0eca0711e16515..b5413fbe2bbcc91ddf877e4d048c237a264d75c4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala @@ -36,7 +36,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { import scala.concurrent.duration._ val futures = (1 to 20).map { _ => - future { + Future { GeneratePredicate.generate(EqualTo(Literal(1), Literal(1))) GenerateMutableProjection.generate(EqualTo(Literal(1), Literal(1)) :: Nil) GenerateOrdering.generate(Add(Literal(1), Literal(1)).asc :: Nil) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala index 8b275e886c46ceadb11c7b4eb1e889df38bafe99..943ad31c0cef52bd950081b0a21fbb30cebf938c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala @@ -77,7 +77,7 @@ case class BroadcastHashJoin( // broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here. val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - future { + Future { // This will run in another thread. Set the execution id so that we can connect these jobs // with the correct execution. SQLExecution.withExecutionId(sparkContext, executionId) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala index db8edd169dcfaccb32c1b0f4fd10799af72f01f8..f48fc3b84864d9872eca4b26d4a312357da99525 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala @@ -76,7 +76,7 @@ case class BroadcastHashOuterJoin( // broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here. val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - future { + Future { // This will run in another thread. Set the execution id so that we can connect these jobs // with the correct execution. SQLExecution.withExecutionId(sparkContext, executionId) { diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index ba3b26e1b7d49cf5199687c81ec361db3c29e885..865197e24caf8ca0029dd4756f891307d0de2617 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -23,7 +23,7 @@ import java.sql.{Date, DriverManager, SQLException, Statement} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import scala.concurrent.{future, Await, ExecutionContext, Promise} +import scala.concurrent.{Await, ExecutionContext, Future, Promise} import scala.concurrent.duration._ import scala.io.Source import scala.util.{Random, Try} @@ -362,7 +362,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { try { // Start a very-long-running query that will take hours to finish, then cancel it in order // to demonstrate that cancellation works. - val f = future { + val f = Future { statement.executeQuery( "SELECT COUNT(*) FROM test_map " + List.fill(10)("join test_map").mkString(" ")) @@ -380,7 +380,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { // Cancellation is a no-op if spark.sql.hive.thriftServer.async=false statement.executeQuery("SET spark.sql.hive.thriftServer.async=false") try { - val sf = future { + val sf = Future { statement.executeQuery( "SELECT COUNT(*) FROM test_map " + List.fill(4)("join test_map").mkString(" ")