-
- Downloads
[SPARK-19529] TransportClientFactory.createClient() shouldn't call awaitUninterruptibly()
This patch replaces a single `awaitUninterruptibly()` call with a plain `await()` call in Spark's `network-common` library in order to fix a bug which may cause tasks to be uncancellable. In Spark's Netty RPC layer, `TransportClientFactory.createClient()` calls `awaitUninterruptibly()` on a Netty future while waiting for a connection to be established. This creates problem when a Spark task is interrupted while blocking in this call (which can happen in the event of a slow connection which will eventually time out). This has bad impacts on task cancellation when `interruptOnCancel = true`. As an example of the impact of this problem, I experienced significant numbers of uncancellable "zombie tasks" on a production cluster where several tasks were blocked trying to connect to a dead shuffle server and then continued running as zombies after I cancelled the associated Spark stage. The zombie tasks ran for several minutes with the following stack: ``` java.lang.Object.wait(Native Method) java.lang.Object.wait(Object.java:460) io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607) io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301) org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224) org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179) => holding Monitor(java.lang.Object1849476028}) org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105) org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114) org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169) org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala: 350) org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286) org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120) org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45) org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) org.apache.spark.rdd.RDD.iterator(RDD.scala:287) [...] ``` As far as I can tell, `awaitUninterruptibly()` might have been used in order to avoid having to declare that methods throw `InterruptedException` (this code is written in Java, hence the need to use checked exceptions). This patch simply replaces this with a regular, interruptible `await()` call,. This required several interface changes to declare a new checked exception (these are internal interfaces, though, and this change doesn't significantly impact binary compatibility). An alternative approach would be to wrap `InterruptedException` into `IOException` in order to avoid having to change interfaces. The problem with this approach is that the `network-shuffle` project's `RetryingBlockFetcher` code treats `IOExceptions` as transitive failures when deciding whether to retry fetches, so throwing a wrapped `IOException` might cause an interrupted shuffle fetch to be retried, further prolonging the lifetime of a cancelled zombie task. Note that there are three other `awaitUninterruptibly()` in the codebase, but those calls have a hard 10 second timeout and are waiting on a `close()` operation which is expected to complete near instantaneously, so the impact of uninterruptibility there is much smaller. Manually. Author: Josh Rosen <joshrosen@databricks.com> Closes #16866 from JoshRosen/SPARK-19529. (cherry picked from commit 1c4d10b1) Signed-off-by:Cheng Lian <lian@databricks.com>
Showing
- common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java 6 additions, 4 deletions...g/apache/spark/network/client/TransportClientFactory.java
- common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java 4 additions, 2 deletions...org/apache/spark/network/TransportClientFactorySuite.java
- common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java 2 additions, 2 deletions...g/apache/spark/network/shuffle/ExternalShuffleClient.java
- common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java 2 additions, 1 deletion...rg/apache/spark/network/shuffle/RetryingBlockFetcher.java
- common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java 1 addition, 1 deletion...ark/network/shuffle/mesos/MesosExternalShuffleClient.java
- common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java 2 additions, 2 deletions...a/org/apache/spark/network/sasl/SaslIntegrationSuite.java
- common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java 1 addition, 1 deletion...park/network/shuffle/ExternalShuffleIntegrationSuite.java
- common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java 4 additions, 3 deletions...e/spark/network/shuffle/ExternalShuffleSecuritySuite.java
- common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java 8 additions, 8 deletions...ache/spark/network/shuffle/RetryingBlockFetcherSuite.java
Please register or sign in to comment