From 56fd34af52a18230bf3ea7b041f2a184eddc1103 Mon Sep 17 00:00:00 2001
From: zsxwing <zsxwing@gmail.com>
Date: Thu, 16 Oct 2014 19:22:02 -0700
Subject: [PATCH] [SPARK-3741] Add afterExecute for handleConnectExecutor

Sorry. I found that I forgot to add `afterExecute` for `handleConnectExecutor` in #2593.

Author: zsxwing <zsxwing@gmail.com>

Closes #2794 from zsxwing/SPARK-3741 and squashes the following commits:

a0bc4dd [zsxwing] Add afterExecute for handleConnectExecutor
---
 .../apache/spark/network/nio/ConnectionManager.scala  | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index 9396b6ba84..bda4bf5093 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -117,7 +117,16 @@ private[nio] class ConnectionManager(
     conf.getInt("spark.core.connection.connect.threads.max", 8),
     conf.getInt("spark.core.connection.connect.threads.keepalive", 60), TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable](),
-    Utils.namedThreadFactory("handle-connect-executor"))
+    Utils.namedThreadFactory("handle-connect-executor")) {
+
+    override def afterExecute(r: Runnable, t: Throwable): Unit = {
+      super.afterExecute(r, t)
+      if (t != null && NonFatal(t)) {
+        logError("Error in handleConnectExecutor is not handled properly", t)
+      }
+    }
+
+  }
 
   private val serverChannel = ServerSocketChannel.open()
   // used to track the SendingConnections waiting to do SASL negotiation
-- 
GitLab