From bae3c9a4eb0c320999e5dbafd62692c12823e07d Mon Sep 17 00:00:00 2001
From: Nishkam Ravi <nishkamravi@gmail.com>
Date: Tue, 26 Jan 2016 21:14:39 -0800
Subject: [PATCH] [SPARK-12967][NETTY] Avoid NettyRpc error message during
 sparkContext shutdown

If there's an RPC issue while sparkContext is alive but stopped (which would happen only when executing SparkContext.stop), log a warning instead. This is a common occurrence.

vanzin

Author: Nishkam Ravi <nishkamravi@gmail.com>
Author: nishkamravi2 <nishkamravi@gmail.com>

Closes #10881 from nishkamravi2/master_netty.
---
 .../spark/rpc/RpcEnvStoppedException.scala    | 20 +++++++++++++++++++
 .../apache/spark/rpc/netty/Dispatcher.scala   |  4 ++--
 .../apache/spark/rpc/netty/NettyRpcEnv.scala  |  6 +++++-
 .../org/apache/spark/rpc/netty/Outbox.scala   |  7 +++++--
 4 files changed, 32 insertions(+), 5 deletions(-)
 create mode 100644 core/src/main/scala/org/apache/spark/rpc/RpcEnvStoppedException.scala

diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnvStoppedException.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnvStoppedException.scala
new file mode 100644
index 0000000000..c296cc23f1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnvStoppedException.scala
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.rpc
+
+private[rpc] class RpcEnvStoppedException()
+  extends IllegalStateException("RpcEnv already stopped.")
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
index 19259e0e80..6ceff2c073 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
@@ -106,7 +106,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
     val iter = endpoints.keySet().iterator()
     while (iter.hasNext) {
       val name = iter.next
-      postMessage(name, message, (e) => logWarning(s"Message $message dropped.", e))
+      postMessage(name, message, (e) => logWarning(s"Message $message dropped. ${e.getMessage}"))
     }
   }
 
@@ -156,7 +156,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
     if (shouldCallOnStop) {
       // We don't need to call `onStop` in the `synchronized` block
       val error = if (stopped) {
-          new IllegalStateException("RpcEnv already stopped.")
+          new RpcEnvStoppedException()
         } else {
           new SparkException(s"Could not find $endpointName or it has been stopped.")
         }
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 9ae74d9d7b..89eda857e6 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -182,7 +182,11 @@ private[netty] class NettyRpcEnv(
     val remoteAddr = message.receiver.address
     if (remoteAddr == address) {
       // Message to a local RPC endpoint.
-      dispatcher.postOneWayMessage(message)
+      try {
+        dispatcher.postOneWayMessage(message)
+      } catch {
+        case e: RpcEnvStoppedException => logWarning(e.getMessage)
+      }
     } else {
       // Message to a remote RPC endpoint.
       postToOutbox(message.receiver, OneWayOutboxMessage(serialize(message)))
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
index 2316ebe347..9fd64e8535 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
@@ -25,7 +25,7 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.{Logging, SparkException}
 import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
-import org.apache.spark.rpc.RpcAddress
+import org.apache.spark.rpc.{RpcAddress, RpcEnvStoppedException}
 
 private[netty] sealed trait OutboxMessage {
 
@@ -43,7 +43,10 @@ private[netty] case class OneWayOutboxMessage(content: ByteBuffer) extends Outbo
   }
 
   override def onFailure(e: Throwable): Unit = {
-    logWarning(s"Failed to send one-way RPC.", e)
+    e match {
+      case e1: RpcEnvStoppedException => logWarning(e1.getMessage)
+      case e1: Throwable => logWarning(s"Failed to send one-way RPC.", e1)
+    }
   }
 
 }
-- 
GitLab