From 5db23473008a58fb9a7f77ad8b01bcdc2c5f2d9c Mon Sep 17 00:00:00 2001 From: Josh Rosen <joshrosen@databricks.com> Date: Mon, 13 Feb 2017 11:04:27 -0800 Subject: [PATCH] [SPARK-19529] TransportClientFactory.createClient() shouldn't call awaitUninterruptibly() This patch replaces a single `awaitUninterruptibly()` call with a plain `await()` call in Spark's `network-common` library in order to fix a bug which may cause tasks to be uncancellable. In Spark's Netty RPC layer, `TransportClientFactory.createClient()` calls `awaitUninterruptibly()` on a Netty future while waiting for a connection to be established. This creates problem when a Spark task is interrupted while blocking in this call (which can happen in the event of a slow connection which will eventually time out). This has bad impacts on task cancellation when `interruptOnCancel = true`. As an example of the impact of this problem, I experienced significant numbers of uncancellable "zombie tasks" on a production cluster where several tasks were blocked trying to connect to a dead shuffle server and then continued running as zombies after I cancelled the associated Spark stage. The zombie tasks ran for several minutes with the following stack: ``` java.lang.Object.wait(Native Method) java.lang.Object.wait(Object.java:460) io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607) io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301) org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224) org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179) => holding Monitor(java.lang.Object1849476028}) org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105) org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114) org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169) org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala: 350) org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286) org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120) org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45) org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) org.apache.spark.rdd.RDD.iterator(RDD.scala:287) [...] ``` As far as I can tell, `awaitUninterruptibly()` might have been used in order to avoid having to declare that methods throw `InterruptedException` (this code is written in Java, hence the need to use checked exceptions). This patch simply replaces this with a regular, interruptible `await()` call,. This required several interface changes to declare a new checked exception (these are internal interfaces, though, and this change doesn't significantly impact binary compatibility). An alternative approach would be to wrap `InterruptedException` into `IOException` in order to avoid having to change interfaces. The problem with this approach is that the `network-shuffle` project's `RetryingBlockFetcher` code treats `IOExceptions` as transitive failures when deciding whether to retry fetches, so throwing a wrapped `IOException` might cause an interrupted shuffle fetch to be retried, further prolonging the lifetime of a cancelled zombie task. Note that there are three other `awaitUninterruptibly()` in the codebase, but those calls have a hard 10 second timeout and are waiting on a `close()` operation which is expected to complete near instantaneously, so the impact of uninterruptibility there is much smaller. Manually. Author: Josh Rosen <joshrosen@databricks.com> Closes #16866 from JoshRosen/SPARK-19529. (cherry picked from commit 1c4d10b10c78d138b55e381ec6828e04fef70d6f) Signed-off-by: Cheng Lian <lian@databricks.com> --- .../network/client/TransportClientFactory.java | 10 ++++++---- .../network/TransportClientFactorySuite.java | 6 ++++-- .../network/shuffle/ExternalShuffleClient.java | 4 ++-- .../network/shuffle/RetryingBlockFetcher.java | 3 ++- .../mesos/MesosExternalShuffleClient.java | 2 +- .../spark/network/sasl/SaslIntegrationSuite.java | 4 ++-- .../shuffle/ExternalShuffleIntegrationSuite.java | 2 +- .../shuffle/ExternalShuffleSecuritySuite.java | 7 ++++--- .../shuffle/RetryingBlockFetcherSuite.java | 16 ++++++++-------- 9 files changed, 30 insertions(+), 24 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index cb10edff65..b50e043d5c 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -122,7 +122,8 @@ public class TransportClientFactory implements Closeable { * * Concurrency: This method is safe to call from multiple threads. */ - public TransportClient createClient(String remoteHost, int remotePort) throws IOException { + public TransportClient createClient(String remoteHost, int remotePort) + throws IOException, InterruptedException { // Get connection from the connection pool first. // If it is not found or not active, create a new one. // Use unresolved address here to avoid DNS resolution each time we creates a client. @@ -190,13 +191,14 @@ public class TransportClientFactory implements Closeable { * As with {@link #createClient(String, int)}, this method is blocking. */ public TransportClient createUnmanagedClient(String remoteHost, int remotePort) - throws IOException { + throws IOException, InterruptedException { final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort); return createClient(address); } /** Create a completely new {@link TransportClient} to the remote address. */ - private TransportClient createClient(InetSocketAddress address) throws IOException { + private TransportClient createClient(InetSocketAddress address) + throws IOException, InterruptedException { logger.debug("Creating new connection to {}", address); Bootstrap bootstrap = new Bootstrap(); @@ -223,7 +225,7 @@ public class TransportClientFactory implements Closeable { // Connect to the remote server long preConnect = System.nanoTime(); ChannelFuture cf = bootstrap.connect(address); - if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) { + if (!cf.await(conf.connectionTimeoutMs())) { throw new IOException( String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs())); } else if (cf.cause() != null) { diff --git a/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java b/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java index 44d16d5422..43e63efed4 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java @@ -100,6 +100,8 @@ public class TransportClientFactorySuite { clients.add(client); } catch (IOException e) { failed.incrementAndGet(); + } catch (InterruptedException e) { + throw new RuntimeException(e); } } }; @@ -143,7 +145,7 @@ public class TransportClientFactorySuite { } @Test - public void returnDifferentClientsForDifferentServers() throws IOException { + public void returnDifferentClientsForDifferentServers() throws IOException, InterruptedException { TransportClientFactory factory = context.createClientFactory(); TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort()); TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server2.getPort()); @@ -172,7 +174,7 @@ public class TransportClientFactorySuite { } @Test - public void closeBlockClientsWithFactory() throws IOException { + public void closeBlockClientsWithFactory() throws IOException, InterruptedException { TransportClientFactory factory = context.createClientFactory(); TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort()); TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server2.getPort()); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java index 772fb88325..eea5cf7bec 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java @@ -101,7 +101,7 @@ public class ExternalShuffleClient extends ShuffleClient { new RetryingBlockFetcher.BlockFetchStarter() { @Override public void createAndStart(String[] blockIds, BlockFetchingListener listener) - throws IOException { + throws IOException, InterruptedException { TransportClient client = clientFactory.createClient(host, port); new OneForOneBlockFetcher(client, appId, execId, blockIds, listener).start(); } @@ -136,7 +136,7 @@ public class ExternalShuffleClient extends ShuffleClient { String host, int port, String execId, - ExecutorShuffleInfo executorInfo) throws IOException { + ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException { checkInit(); TransportClient client = clientFactory.createUnmanagedClient(host, port); try { diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java index 72bd0f803d..5be855048e 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java @@ -57,7 +57,8 @@ public class RetryingBlockFetcher { * {@link org.apache.spark.network.client.TransportClientFactory} in order to fix connection * issues. */ - void createAndStart(String[] blockIds, BlockFetchingListener listener) throws IOException; + void createAndStart(String[] blockIds, BlockFetchingListener listener) + throws IOException, InterruptedException; } /** Shared executor service used for waiting and retrying. */ diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java index 42cedd9943..c6d6029d5a 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java @@ -69,7 +69,7 @@ public class MesosExternalShuffleClient extends ExternalShuffleClient { String host, int port, long heartbeatTimeoutMs, - long heartbeatIntervalMs) throws IOException { + long heartbeatIntervalMs) throws IOException, InterruptedException { checkInit(); ByteBuffer registerDriver = new RegisterDriver(appId, heartbeatTimeoutMs).toByteBuffer(); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java index 6ba937dddb..81a3f065d6 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java @@ -103,7 +103,7 @@ public class SaslIntegrationSuite { } @Test - public void testGoodClient() throws IOException { + public void testGoodClient() throws IOException, InterruptedException { clientFactory = context.createClientFactory( Lists.<TransportClientBootstrap>newArrayList( new SaslClientBootstrap(conf, "app-1", secretKeyHolder))); @@ -133,7 +133,7 @@ public class SaslIntegrationSuite { } @Test - public void testNoSaslClient() throws IOException { + public void testNoSaslClient() throws IOException, InterruptedException { clientFactory = context.createClientFactory( Lists.<TransportClientBootstrap>newArrayList()); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java index 552b5366c5..a04b6825d1 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java @@ -240,7 +240,7 @@ public class ExternalShuffleIntegrationSuite { } private void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo) - throws IOException { + throws IOException, InterruptedException { ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, false); client.init(APP_ID); client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java index a0f69ca29a..6ef1a2d2c8 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java @@ -59,7 +59,7 @@ public class ExternalShuffleSecuritySuite { } @Test - public void testValid() throws IOException { + public void testValid() throws IOException, InterruptedException { validate("my-app-id", "secret", false); } @@ -82,12 +82,13 @@ public class ExternalShuffleSecuritySuite { } @Test - public void testEncryption() throws IOException { + public void testEncryption() throws IOException, InterruptedException { validate("my-app-id", "secret", true); } /** Creates an ExternalShuffleClient and attempts to register with the server. */ - private void validate(String appId, String secretKey, boolean encrypt) throws IOException { + private void validate(String appId, String secretKey, boolean encrypt) + throws IOException, InterruptedException { ExternalShuffleClient client = new ExternalShuffleClient(conf, new TestSecretKeyHolder(appId, secretKey), true, encrypt); client.init(appId); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java index 91882e3b3b..f221544e9e 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java @@ -66,7 +66,7 @@ public class RetryingBlockFetcherSuite { } @Test - public void testNoFailures() throws IOException { + public void testNoFailures() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List<? extends Map<String, Object>> interactions = Arrays.asList( @@ -85,7 +85,7 @@ public class RetryingBlockFetcherSuite { } @Test - public void testUnrecoverableFailure() throws IOException { + public void testUnrecoverableFailure() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List<? extends Map<String, Object>> interactions = Arrays.asList( @@ -104,7 +104,7 @@ public class RetryingBlockFetcherSuite { } @Test - public void testSingleIOExceptionOnFirst() throws IOException { + public void testSingleIOExceptionOnFirst() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List<? extends Map<String, Object>> interactions = Arrays.asList( @@ -127,7 +127,7 @@ public class RetryingBlockFetcherSuite { } @Test - public void testSingleIOExceptionOnSecond() throws IOException { + public void testSingleIOExceptionOnSecond() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List<? extends Map<String, Object>> interactions = Arrays.asList( @@ -149,7 +149,7 @@ public class RetryingBlockFetcherSuite { } @Test - public void testTwoIOExceptions() throws IOException { + public void testTwoIOExceptions() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List<? extends Map<String, Object>> interactions = Arrays.asList( @@ -177,7 +177,7 @@ public class RetryingBlockFetcherSuite { } @Test - public void testThreeIOExceptions() throws IOException { + public void testThreeIOExceptions() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List<? extends Map<String, Object>> interactions = Arrays.asList( @@ -209,7 +209,7 @@ public class RetryingBlockFetcherSuite { } @Test - public void testRetryAndUnrecoverable() throws IOException { + public void testRetryAndUnrecoverable() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List<? extends Map<String, Object>> interactions = Arrays.asList( @@ -252,7 +252,7 @@ public class RetryingBlockFetcherSuite { @SuppressWarnings("unchecked") private static void performInteractions(List<? extends Map<String, Object>> interactions, BlockFetchingListener listener) - throws IOException { + throws IOException, InterruptedException { TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); BlockFetchStarter fetchStarter = mock(BlockFetchStarter.class); -- GitLab