diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index cb10edff659f183981fa9009592a312532b88c80..b50e043d5c9ce556b5cf39d59c5b51bd6ebe77a3 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -122,7 +122,8 @@ public class TransportClientFactory implements Closeable { * * Concurrency: This method is safe to call from multiple threads. */ - public TransportClient createClient(String remoteHost, int remotePort) throws IOException { + public TransportClient createClient(String remoteHost, int remotePort) + throws IOException, InterruptedException { // Get connection from the connection pool first. // If it is not found or not active, create a new one. // Use unresolved address here to avoid DNS resolution each time we creates a client. @@ -190,13 +191,14 @@ public class TransportClientFactory implements Closeable { * As with {@link #createClient(String, int)}, this method is blocking. */ public TransportClient createUnmanagedClient(String remoteHost, int remotePort) - throws IOException { + throws IOException, InterruptedException { final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort); return createClient(address); } /** Create a completely new {@link TransportClient} to the remote address. */ - private TransportClient createClient(InetSocketAddress address) throws IOException { + private TransportClient createClient(InetSocketAddress address) + throws IOException, InterruptedException { logger.debug("Creating new connection to {}", address); Bootstrap bootstrap = new Bootstrap(); @@ -223,7 +225,7 @@ public class TransportClientFactory implements Closeable { // Connect to the remote server long preConnect = System.nanoTime(); ChannelFuture cf = bootstrap.connect(address); - if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) { + if (!cf.await(conf.connectionTimeoutMs())) { throw new IOException( String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs())); } else if (cf.cause() != null) { diff --git a/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java b/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java index f54a64cb0f849f33af241ba62e045838b0eca1a2..205ab88c843132c5fe6c7f994aab8aeef418121f 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java @@ -99,6 +99,8 @@ public class TransportClientFactorySuite { clients.add(client); } catch (IOException e) { failed.incrementAndGet(); + } catch (InterruptedException e) { + throw new RuntimeException(e); } } }; @@ -142,7 +144,7 @@ public class TransportClientFactorySuite { } @Test - public void returnDifferentClientsForDifferentServers() throws IOException { + public void returnDifferentClientsForDifferentServers() throws IOException, InterruptedException { TransportClientFactory factory = context.createClientFactory(); TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort()); TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server2.getPort()); @@ -171,7 +173,7 @@ public class TransportClientFactorySuite { } @Test - public void closeBlockClientsWithFactory() throws IOException { + public void closeBlockClientsWithFactory() throws IOException, InterruptedException { TransportClientFactory factory = context.createClientFactory(); TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort()); TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server2.getPort()); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java index 616505d9796d05b172c8bdb6150c0abc843b358c..8c0c400966ed57c1c6b1717dca23cb6690217a9d 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java @@ -94,7 +94,7 @@ public class ExternalShuffleClient extends ShuffleClient { new RetryingBlockFetcher.BlockFetchStarter() { @Override public void createAndStart(String[] blockIds, BlockFetchingListener listener) - throws IOException { + throws IOException, InterruptedException { TransportClient client = clientFactory.createClient(host, port); new OneForOneBlockFetcher(client, appId, execId, blockIds, listener).start(); } @@ -129,7 +129,7 @@ public class ExternalShuffleClient extends ShuffleClient { String host, int port, String execId, - ExecutorShuffleInfo executorInfo) throws IOException { + ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException { checkInit(); TransportClient client = clientFactory.createUnmanagedClient(host, port); try { diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java index 72bd0f803da33ea5d09c47a35f0c4ed30c9e7fa9..5be855048e4d619d75be0b4efa421d73ea2804d0 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java @@ -57,7 +57,8 @@ public class RetryingBlockFetcher { * {@link org.apache.spark.network.client.TransportClientFactory} in order to fix connection * issues. */ - void createAndStart(String[] blockIds, BlockFetchingListener listener) throws IOException; + void createAndStart(String[] blockIds, BlockFetchingListener listener) + throws IOException, InterruptedException; } /** Shared executor service used for waiting and retrying. */ diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java index ab49b1c1d789694338c2ee3b02a9fe389e7dc146..dbc1010847fb16cba4f31595d57fec05bdf636cb 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java @@ -68,7 +68,7 @@ public class MesosExternalShuffleClient extends ExternalShuffleClient { String host, int port, long heartbeatTimeoutMs, - long heartbeatIntervalMs) throws IOException { + long heartbeatIntervalMs) throws IOException, InterruptedException { checkInit(); ByteBuffer registerDriver = new RegisterDriver(appId, heartbeatTimeoutMs).toByteBuffer(); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java index 298a487ebb7522d40e20f40ac6bb57b94120c276..52f50a3409b984ddb06325c450a0fbafa2e2c8fa 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java @@ -103,7 +103,7 @@ public class SaslIntegrationSuite { } @Test - public void testGoodClient() throws IOException { + public void testGoodClient() throws IOException, InterruptedException { clientFactory = context.createClientFactory( Lists.<TransportClientBootstrap>newArrayList( new SaslClientBootstrap(conf, "app-1", secretKeyHolder))); @@ -133,7 +133,7 @@ public class SaslIntegrationSuite { } @Test - public void testNoSaslClient() throws IOException { + public void testNoSaslClient() throws IOException, InterruptedException { clientFactory = context.createClientFactory( Lists.<TransportClientBootstrap>newArrayList()); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java index 9248ef3c467dfd38704a23ce797fd637b3e962d7..88de6fb83c6371847f1de218f325979e413b756e 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java @@ -242,7 +242,7 @@ public class ExternalShuffleIntegrationSuite { } private void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo) - throws IOException { + throws IOException, InterruptedException { ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false); client.init(APP_ID); client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java index 4ae75a1b1762a979333d713feb1faa3f7358bdf4..bf20c577ed420800441df11704a62e75d07e0984 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java @@ -60,7 +60,7 @@ public class ExternalShuffleSecuritySuite { } @Test - public void testValid() throws IOException { + public void testValid() throws IOException, InterruptedException { validate("my-app-id", "secret", false); } @@ -83,12 +83,13 @@ public class ExternalShuffleSecuritySuite { } @Test - public void testEncryption() throws IOException { + public void testEncryption() throws IOException, InterruptedException { validate("my-app-id", "secret", true); } /** Creates an ExternalShuffleClient and attempts to register with the server. */ - private void validate(String appId, String secretKey, boolean encrypt) throws IOException { + private void validate(String appId, String secretKey, boolean encrypt) + throws IOException, InterruptedException { TransportConf testConf = conf; if (encrypt) { testConf = new TransportConf("shuffle", new MapConfigProvider( diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java index a2509f5f34c880e9d79a949914dcac60b4a7578c..6db71eea6e8b53ec04474bf2cbb9bfd6d63676f4 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java @@ -52,7 +52,7 @@ public class RetryingBlockFetcherSuite { ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19])); @Test - public void testNoFailures() throws IOException { + public void testNoFailures() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List<? extends Map<String, Object>> interactions = Arrays.asList( @@ -71,7 +71,7 @@ public class RetryingBlockFetcherSuite { } @Test - public void testUnrecoverableFailure() throws IOException { + public void testUnrecoverableFailure() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List<? extends Map<String, Object>> interactions = Arrays.asList( @@ -90,7 +90,7 @@ public class RetryingBlockFetcherSuite { } @Test - public void testSingleIOExceptionOnFirst() throws IOException { + public void testSingleIOExceptionOnFirst() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List<? extends Map<String, Object>> interactions = Arrays.asList( @@ -113,7 +113,7 @@ public class RetryingBlockFetcherSuite { } @Test - public void testSingleIOExceptionOnSecond() throws IOException { + public void testSingleIOExceptionOnSecond() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List<? extends Map<String, Object>> interactions = Arrays.asList( @@ -135,7 +135,7 @@ public class RetryingBlockFetcherSuite { } @Test - public void testTwoIOExceptions() throws IOException { + public void testTwoIOExceptions() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List<? extends Map<String, Object>> interactions = Arrays.asList( @@ -163,7 +163,7 @@ public class RetryingBlockFetcherSuite { } @Test - public void testThreeIOExceptions() throws IOException { + public void testThreeIOExceptions() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List<? extends Map<String, Object>> interactions = Arrays.asList( @@ -195,7 +195,7 @@ public class RetryingBlockFetcherSuite { } @Test - public void testRetryAndUnrecoverable() throws IOException { + public void testRetryAndUnrecoverable() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List<? extends Map<String, Object>> interactions = Arrays.asList( @@ -238,7 +238,7 @@ public class RetryingBlockFetcherSuite { @SuppressWarnings("unchecked") private static void performInteractions(List<? extends Map<String, Object>> interactions, BlockFetchingListener listener) - throws IOException { + throws IOException, InterruptedException { MapConfigProvider provider = new MapConfigProvider(ImmutableMap.of( "spark.shuffle.io.maxRetries", "2",