Skip to content
Snippets Groups Projects
Commit b362d50f authored by Jacek Lewandowski's avatar Jacek Lewandowski Committed by Andrew Or
Browse files

[SPARK-11726] Throw exception on timeout when waiting for REST server response

Author: Jacek Lewandowski <lewandowski.jacek@gmail.com>

Closes #9692 from jacek-lewandowski/SPARK-11726.
parent 52c734b5
No related branches found
No related tags found
No related merge requests found
...@@ -19,16 +19,19 @@ package org.apache.spark.deploy.rest ...@@ -19,16 +19,19 @@ package org.apache.spark.deploy.rest
import java.io.{DataOutputStream, FileNotFoundException} import java.io.{DataOutputStream, FileNotFoundException}
import java.net.{ConnectException, HttpURLConnection, SocketException, URL} import java.net.{ConnectException, HttpURLConnection, SocketException, URL}
import java.util.concurrent.TimeoutException
import javax.servlet.http.HttpServletResponse import javax.servlet.http.HttpServletResponse
import scala.collection.mutable import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.io.Source import scala.io.Source
import com.fasterxml.jackson.core.JsonProcessingException import com.fasterxml.jackson.core.JsonProcessingException
import com.google.common.base.Charsets import com.google.common.base.Charsets
import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SPARK_VERSION => sparkVersion, SparkConf}
/** /**
* A client that submits applications to a [[RestSubmissionServer]]. * A client that submits applications to a [[RestSubmissionServer]].
...@@ -225,7 +228,8 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { ...@@ -225,7 +228,8 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {
* Exposed for testing. * Exposed for testing.
*/ */
private[rest] def readResponse(connection: HttpURLConnection): SubmitRestProtocolResponse = { private[rest] def readResponse(connection: HttpURLConnection): SubmitRestProtocolResponse = {
try { import scala.concurrent.ExecutionContext.Implicits.global
val responseFuture = Future {
val dataStream = val dataStream =
if (connection.getResponseCode == HttpServletResponse.SC_OK) { if (connection.getResponseCode == HttpServletResponse.SC_OK) {
connection.getInputStream connection.getInputStream
...@@ -251,11 +255,15 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { ...@@ -251,11 +255,15 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {
throw new SubmitRestProtocolException( throw new SubmitRestProtocolException(
s"Message received from server was not a response:\n${unexpected.toJson}") s"Message received from server was not a response:\n${unexpected.toJson}")
} }
} catch { }
try { Await.result(responseFuture, 10.seconds) } catch {
case unreachable @ (_: FileNotFoundException | _: SocketException) => case unreachable @ (_: FileNotFoundException | _: SocketException) =>
throw new SubmitRestConnectionException("Unable to connect to server", unreachable) throw new SubmitRestConnectionException("Unable to connect to server", unreachable)
case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) => case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
throw new SubmitRestProtocolException("Malformed response received from server", malformed) throw new SubmitRestProtocolException("Malformed response received from server", malformed)
case timeout: TimeoutException =>
throw new SubmitRestConnectionException("No response from server", timeout)
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment