diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index a50600f1488c9edd1c1737c9aa09e04f64e9e525..0899693988016d6924d9751343bd81ba9534a5c4 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -261,7 +261,7 @@ class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter: S private def getImpl(timeout: Duration): T = { // This will throw TimeoutException on timeout: - Await.ready(futureAction, timeout) + ThreadUtils.awaitReady(futureAction, timeout) futureAction.value.get match { case scala.util.Success(value) => converter(value) case scala.util.Failure(exception) => diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 68178c7fb3bb1cbb8cd7fd14dbe01b7e4cb55c90..875acc37e90f36419e836be9cb2610cb40c1fd50 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -618,12 +618,7 @@ class DAGScheduler( properties: Properties): Unit = { val start = System.nanoTime val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) - // Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`, - // which causes concurrent SQL executions to fail if a fork-join pool is used. Note that - // due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's - // safe to pass in null here. For more detail, see SPARK-13747. - val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait] - waiter.completionFuture.ready(Duration.Inf)(awaitPermission) + ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf) waiter.completionFuture.value.get match { case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index b3e458448974fb584e129335a014ba44ae2548cb..137d24b525155cb64413ee8812c5a258381aa884 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -23,7 +23,7 @@ import java.nio.channels.Channels import scala.collection.mutable import scala.collection.mutable.HashMap -import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import scala.reflect.ClassTag import scala.util.Random @@ -334,7 +334,7 @@ private[spark] class BlockManager( val task = asyncReregisterTask if (task != null) { try { - Await.ready(task, Duration.Inf) + ThreadUtils.awaitReady(task, Duration.Inf) } catch { case NonFatal(t) => throw new Exception("Error occurred while waiting for async. reregistration", t) @@ -916,7 +916,7 @@ private[spark] class BlockManager( if (level.replication > 1) { // Wait for asynchronous replication to finish try { - Await.ready(replicationFuture, Duration.Inf) + ThreadUtils.awaitReady(replicationFuture, Duration.Inf) } catch { case NonFatal(t) => throw new Exception("Error occurred while waiting for replication to finish", t) diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 1aa4456ed01b4d135d4959c0ceaa875ddc42079b..81aaf79db0c138a80516e8e3b276831afec3e9d2 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -206,4 +206,25 @@ private[spark] object ThreadUtils { } } // scalastyle:on awaitresult + + // scalastyle:off awaitready + /** + * Preferred alternative to `Await.ready()`. + * + * @see [[awaitResult]] + */ + @throws(classOf[SparkException]) + def awaitReady[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type = { + try { + // `awaitPermission` is not actually used anywhere so it's safe to pass in null here. + // See SPARK-13747. + val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait] + awaitable.ready(atMost)(awaitPermission) + } catch { + // TimeoutException is thrown in the current thread, so not need to warp the exception. + case NonFatal(t) if !t.isInstanceOf[TimeoutException] => + throw new SparkException("Exception thrown in awaitResult: ", t) + } + } + // scalastyle:on awaitready } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 7e26139a2beadb93deb8c6a5643f3f6ece1bfde9..27945a9a5ede8782fabf8e145ae4cb78515b0db9 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -23,7 +23,6 @@ import java.nio.charset.StandardCharsets import java.util.concurrent.TimeUnit import scala.concurrent.duration._ -import scala.concurrent.Await import com.google.common.io.Files import org.apache.hadoop.conf.Configuration @@ -35,7 +34,7 @@ import org.scalatest.concurrent.Eventually import org.scalatest.Matchers._ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskEnd, SparkListenerTaskStart} -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventually { @@ -315,7 +314,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) val future = sc.parallelize(Seq(0)).foreachAsync(_ => {Thread.sleep(1000L)}) sc.cancelJobGroup("nonExistGroupId") - Await.ready(future, Duration(2, TimeUnit.SECONDS)) + ThreadUtils.awaitReady(future, Duration(2, TimeUnit.SECONDS)) // In SPARK-6414, sc.cancelJobGroup will cause NullPointerException and cause // SparkContext to shutdown, so the following assertion will fail. diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala index fe8955840d72f2c4192bd3234c0409a7ced5b231..792a1d7f57e2d91cc9d2795300f8c9e90e364252 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala @@ -22,7 +22,7 @@ import java.nio._ import java.nio.charset.StandardCharsets import java.util.concurrent.TimeUnit -import scala.concurrent.{Await, Promise} +import scala.concurrent.Promise import scala.concurrent.duration._ import scala.util.{Failure, Success, Try} @@ -36,6 +36,7 @@ import org.apache.spark.network.{BlockDataManager, BlockTransferService} import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.shuffle.BlockFetchingListener import org.apache.spark.storage.{BlockId, ShuffleBlockId} +import org.apache.spark.util.ThreadUtils class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar with ShouldMatchers { test("security default off") { @@ -166,7 +167,7 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi } }) - Await.ready(promise.future, FiniteDuration(10, TimeUnit.SECONDS)) + ThreadUtils.awaitReady(promise.future, FiniteDuration(10, TimeUnit.SECONDS)) promise.future.value.get } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 8300607ea888b36e4d5655c1fd1000e96d138d50..37b08980db87741f555b6c0869afbe76974dfb23 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -21,7 +21,7 @@ import java.util.concurrent.{TimeoutException, TimeUnit} import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import scala.concurrent.{Await, Future} +import scala.concurrent.Future import scala.concurrent.duration.{Duration, SECONDS} import scala.language.existentials import scala.reflect.ClassTag @@ -260,7 +260,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa */ def awaitJobTermination(jobFuture: Future[_], duration: Duration): Unit = { try { - Await.ready(jobFuture, duration) + ThreadUtils.awaitReady(jobFuture, duration) } catch { case te: TimeoutException if backendException.get() != null => val msg = raw""" diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala index 1b325801e27fce16fd81e829a5fb8da0775d172c..917db766f7f11cec4e72d613f468f8196046066b 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala @@ -152,7 +152,7 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach { // one should acquire the write lock. The second thread should block until the winner of the // write race releases its lock. val winningFuture: Future[Boolean] = - Await.ready(Future.firstCompletedOf(Seq(lock1Future, lock2Future)), 1.seconds) + ThreadUtils.awaitReady(Future.firstCompletedOf(Seq(lock1Future, lock2Future)), 1.seconds) assert(winningFuture.value.get.get) val winningTID = blockInfoManager.get("block").get.writerTask assert(winningTID === 1 || winningTID === 2) diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala index fef24ed4c5dd0911c7b91dedca365a2fe7464b84..8d56d4be9c42a9fe27d2f55c9709708a4d02978c 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala @@ -140,7 +140,9 @@ class KinesisCheckpointerSuite extends TestSuiteBase ExecutionContext.global) intercept[TimeoutException] { + // scalastyle:off awaitready Await.ready(f, 50 millis) + // scalastyle:on awaitready } clock.advance(checkpointInterval.milliseconds / 2) diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 1f48d71cc7a2b93bedab47e3f407066426f85a10..0a4073b03957c9a0784715cd61c01486d0732c93 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -203,6 +203,17 @@ This file is divided into 3 sections: ]]></customMessage> </check> + <check customId="awaitready" level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> + <parameters><parameter name="regex">Await\.ready</parameter></parameters> + <customMessage><![CDATA[ + Are you sure that you want to use Await.ready? In most cases, you should use ThreadUtils.awaitReady instead. + If you must use Await.ready, wrap the code block with + // scalastyle:off awaitready + Await.ready(...) + // scalastyle:on awaitready + ]]></customMessage> + </check> + <!-- As of SPARK-9613 JavaConversions should be replaced with JavaConverters --> <check customId="javaconversions" level="error" class="org.scalastyle.scalariform.TokenChecker" enabled="true"> <parameters><parameter name="regex">JavaConversions</parameter></parameters> diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index 845f554308c431a2e2db0fb2b5c4719ed1be7121..1e5f18797e152afadf3be8aad33a31e57ab0d247 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -189,7 +189,9 @@ private[streaming] class FileBasedWriteAheadLog( val f = Future { deleteFile(logInfo) }(executionContext) if (waitForCompletion) { import scala.concurrent.duration._ + // scalastyle:off awaitready Await.ready(f, 1 second) + // scalastyle:on awaitready } } catch { case e: RejectedExecutionException =>