Skip to content
Snippets Groups Projects
Commit 31f31598 authored by Marcelo Vanzin's avatar Marcelo Vanzin
Browse files

[SPARK-11040] [NETWORK] Make sure SASL handler delegates all events.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #9053 from vanzin/SPARK-11040.
parent 135a2ce5
No related branches found
No related tags found
No related merge requests found
......@@ -115,9 +115,18 @@ class SaslRpcHandler extends RpcHandler {
@Override
public void connectionTerminated(TransportClient client) {
if (saslServer != null) {
saslServer.dispose();
try {
delegate.connectionTerminated(client);
} finally {
if (saslServer != null) {
saslServer.dispose();
}
}
}
@Override
public void exceptionCaught(Throwable cause, TransportClient client) {
delegate.exceptionCaught(cause, client);
}
}
......@@ -76,7 +76,13 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
@Override
public void channelUnregistered() {
streamManager.connectionTerminated(channel);
if (streamManager != null) {
try {
streamManager.connectionTerminated(channel);
} catch (RuntimeException e) {
logger.error("StreamManager connectionTerminated() callback failed.", e);
}
}
rpcHandler.connectionTerminated(reverseClient);
}
......
......@@ -153,6 +153,8 @@ public class SparkSaslSuite {
assertEquals("Pong", new String(response, StandardCharsets.UTF_8));
} finally {
ctx.close();
// There should be 2 terminated events; one for the client, one for the server.
verify(rpcHandler, times(2)).connectionTerminated(any(TransportClient.class));
}
}
......@@ -334,6 +336,23 @@ public class SparkSaslSuite {
}
}
@Test
public void testRpcHandlerDelegate() throws Exception {
// Tests all delegates exception for receive(), which is more complicated and already handled
// by all other tests.
RpcHandler handler = mock(RpcHandler.class);
RpcHandler saslHandler = new SaslRpcHandler(null, null, handler, null);
saslHandler.getStreamManager();
verify(handler).getStreamManager();
saslHandler.connectionTerminated(null);
verify(handler).connectionTerminated(any(TransportClient.class));
saslHandler.exceptionCaught(null, null);
verify(handler).exceptionCaught(any(Throwable.class), any(TransportClient.class));
}
private static class SaslTestCtx {
final TransportClient client;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment