Skip to content
Snippets Groups Projects
Commit 18bd67c2 authored by Josh Rosen's avatar Josh Rosen Committed by Reynold Xin
Browse files

[SPARK-3887] Send stracktrace in ConnectionManager error replies

When reporting that a remote error occurred, the ConnectionManager should also log the stacktrace of the remote exception. This PR accomplishes this by sending the remote exception's stacktrace as the payload in the "negative ACK / error message."

Author: Josh Rosen <joshrosen@apache.org>

Closes #2741 from JoshRosen/propagate-cm-exceptions-to-sender and squashes the following commits:

b5366cc [Josh Rosen] Explicitly encode error messages using UTF-8.
cef18b3 [Josh Rosen] [SPARK-3887] Send stracktrace in ConnectionManager error messages.
parent 69c67aba
No related branches found
No related tags found
No related merge requests found
......@@ -748,9 +748,7 @@ private[nio] class ConnectionManager(
} catch {
case e: Exception => {
logError(s"Exception was thrown while processing message", e)
val m = Message.createBufferMessage(bufferMessage.id)
m.hasError = true
ackMessage = Some(m)
ackMessage = Some(Message.createErrorMessage(e, bufferMessage.id))
}
} finally {
sendMessage(connectionManagerId, ackMessage.getOrElse {
......@@ -913,8 +911,12 @@ private[nio] class ConnectionManager(
}
case scala.util.Success(ackMessage) =>
if (ackMessage.hasError) {
val errorMsgByteBuf = ackMessage.asInstanceOf[BufferMessage].buffers.head
val errorMsgBytes = new Array[Byte](errorMsgByteBuf.limit())
errorMsgByteBuf.get(errorMsgBytes)
val errorMsg = new String(errorMsgBytes, "utf-8")
val e = new IOException(
"sendMessageReliably failed with ACK that signalled a remote error")
s"sendMessageReliably failed with ACK that signalled a remote error: $errorMsg")
if (!promise.tryFailure(e)) {
logWarning("Ignore error because promise is completed", e)
}
......
......@@ -22,6 +22,7 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.util.Utils
private[nio] abstract class Message(val typ: Long, val id: Int) {
var senderAddress: InetSocketAddress = null
......@@ -84,6 +85,19 @@ private[nio] object Message {
createBufferMessage(new Array[ByteBuffer](0), ackId)
}
/**
* Create a "negative acknowledgment" to notify a sender that an error occurred
* while processing its message. The exception's stacktrace will be formatted
* as a string, serialized into a byte array, and sent as the message payload.
*/
def createErrorMessage(exception: Exception, ackId: Int): BufferMessage = {
val exceptionString = Utils.exceptionString(exception)
val serializedExceptionString = ByteBuffer.wrap(exceptionString.getBytes("utf-8"))
val errorMessage = createBufferMessage(serializedExceptionString, ackId)
errorMessage.hasError = true
errorMessage
}
def create(header: MessageChunkHeader): Message = {
val newMessage: Message = header.typ match {
case BUFFER_MESSAGE => new BufferMessage(header.id,
......
......@@ -151,17 +151,14 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa
} catch {
case e: Exception => {
logError("Exception handling buffer message", e)
val errorMessage = Message.createBufferMessage(msg.id)
errorMessage.hasError = true
Some(errorMessage)
Some(Message.createErrorMessage(e, msg.id))
}
}
case otherMessage: Any =>
logError("Unknown type message received: " + otherMessage)
val errorMessage = Message.createBufferMessage(msg.id)
errorMessage.hasError = true
Some(errorMessage)
val errorMsg = s"Received unknown message type: ${otherMessage.getClass.getName}"
logError(errorMsg)
Some(Message.createErrorMessage(new UnsupportedOperationException(errorMsg), msg.id))
}
}
......
......@@ -27,6 +27,7 @@ import scala.language.postfixOps
import org.scalatest.FunSuite
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.util.Utils
/**
* Test the ConnectionManager with various security settings.
......@@ -236,7 +237,7 @@ class ConnectionManagerSuite extends FunSuite {
val manager = new ConnectionManager(0, conf, securityManager)
val managerServer = new ConnectionManager(0, conf, securityManager)
managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
throw new Exception
throw new Exception("Custom exception text")
})
val size = 10 * 1024 * 1024
......@@ -246,9 +247,10 @@ class ConnectionManagerSuite extends FunSuite {
val future = manager.sendMessageReliably(managerServer.id, bufferMessage)
intercept[IOException] {
val exception = intercept[IOException] {
Await.result(future, 1 second)
}
assert(Utils.exceptionString(exception).contains("Custom exception text"))
manager.stop()
managerServer.stop()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment