Skip to content
Snippets Groups Projects
Commit a3989058 authored by Bryan Cutler's avatar Bryan Cutler Committed by Reynold Xin
Browse files

[SPARK-10827][CORE] AppClient should not use `askWithReply` in `receiveAndReply`

Changed AppClient to be non-blocking in `receiveAndReply` by using a separate thread to wait for response and reply to the context.  The threads are managed by a thread pool.  Also added unit tests for the AppClient interface.

Author: Bryan Cutler <bjcutler@us.ibm.com>

Closes #9317 from BryanCutler/appClient-receiveAndReply-SPARK-10827.
parent 21c562fa
No related branches found
No related tags found
No related merge requests found
...@@ -49,8 +49,8 @@ private[spark] class AppClient( ...@@ -49,8 +49,8 @@ private[spark] class AppClient(
private val REGISTRATION_TIMEOUT_SECONDS = 20 private val REGISTRATION_TIMEOUT_SECONDS = 20
private val REGISTRATION_RETRIES = 3 private val REGISTRATION_RETRIES = 3
private var endpoint: RpcEndpointRef = null @volatile private var endpoint: RpcEndpointRef = null
private var appId: String = null @volatile private var appId: String = null
@volatile private var registered = false @volatile private var registered = false
private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint
...@@ -77,6 +77,11 @@ private[spark] class AppClient( ...@@ -77,6 +77,11 @@ private[spark] class AppClient(
private val registrationRetryThread = private val registrationRetryThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread") ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread")
// A thread pool to perform receive then reply actions in a thread so as not to block the
// event loop.
private val askAndReplyThreadPool =
ThreadUtils.newDaemonCachedThreadPool("appclient-receive-and-reply-threadpool")
override def onStart(): Unit = { override def onStart(): Unit = {
try { try {
registerWithMaster(1) registerWithMaster(1)
...@@ -200,7 +205,7 @@ private[spark] class AppClient( ...@@ -200,7 +205,7 @@ private[spark] class AppClient(
case r: RequestExecutors => case r: RequestExecutors =>
master match { master match {
case Some(m) => context.reply(m.askWithRetry[Boolean](r)) case Some(m) => askAndReplyAsync(m, context, r)
case None => case None =>
logWarning("Attempted to request executors before registering with Master.") logWarning("Attempted to request executors before registering with Master.")
context.reply(false) context.reply(false)
...@@ -208,13 +213,32 @@ private[spark] class AppClient( ...@@ -208,13 +213,32 @@ private[spark] class AppClient(
case k: KillExecutors => case k: KillExecutors =>
master match { master match {
case Some(m) => context.reply(m.askWithRetry[Boolean](k)) case Some(m) => askAndReplyAsync(m, context, k)
case None => case None =>
logWarning("Attempted to kill executors before registering with Master.") logWarning("Attempted to kill executors before registering with Master.")
context.reply(false) context.reply(false)
} }
} }
private def askAndReplyAsync[T](
endpointRef: RpcEndpointRef,
context: RpcCallContext,
msg: T): Unit = {
// Create a thread to ask a message and reply with the result. Allow thread to be
// interrupted during shutdown, otherwise context must be notified of NonFatal errors.
askAndReplyThreadPool.execute(new Runnable {
override def run(): Unit = {
try {
context.reply(endpointRef.askWithRetry[Boolean](msg))
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(t) =>
context.sendFailure(t)
}
}
})
}
override def onDisconnected(address: RpcAddress): Unit = { override def onDisconnected(address: RpcAddress): Unit = {
if (master.exists(_.address == address)) { if (master.exists(_.address == address)) {
logWarning(s"Connection to $address failed; waiting for master to reconnect...") logWarning(s"Connection to $address failed; waiting for master to reconnect...")
...@@ -252,6 +276,7 @@ private[spark] class AppClient( ...@@ -252,6 +276,7 @@ private[spark] class AppClient(
registrationRetryThread.shutdownNow() registrationRetryThread.shutdownNow()
registerMasterFutures.foreach(_.cancel(true)) registerMasterFutures.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow() registerMasterThreadPool.shutdownNow()
askAndReplyThreadPool.shutdownNow()
} }
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.client
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.concurrent.duration._
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually._
import org.apache.spark._
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.{ApplicationInfo, Master}
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.util.Utils
/**
* End-to-end tests for application client in standalone mode.
*/
class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterAll {
private val numWorkers = 2
private val conf = new SparkConf()
private val securityManager = new SecurityManager(conf)
private var masterRpcEnv: RpcEnv = null
private var workerRpcEnvs: Seq[RpcEnv] = null
private var master: Master = null
private var workers: Seq[Worker] = null
/**
* Start the local cluster.
* Note: local-cluster mode is insufficient because we want a reference to the Master.
*/
override def beforeAll(): Unit = {
super.beforeAll()
masterRpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityManager)
workerRpcEnvs = (0 until numWorkers).map { i =>
RpcEnv.create(Worker.SYSTEM_NAME + i, "localhost", 0, conf, securityManager)
}
master = makeMaster()
workers = makeWorkers(10, 2048)
// Wait until all workers register with master successfully
eventually(timeout(60.seconds), interval(10.millis)) {
assert(getMasterState.workers.size === numWorkers)
}
}
override def afterAll(): Unit = {
workerRpcEnvs.foreach(_.shutdown())
masterRpcEnv.shutdown()
workers.foreach(_.stop())
master.stop()
workerRpcEnvs = null
masterRpcEnv = null
workers = null
master = null
super.afterAll()
}
test("interface methods of AppClient using local Master") {
val ci = new AppClientInst(masterRpcEnv.address.toSparkURL)
ci.client.start()
// Client should connect with one Master which registers the application
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
assert(ci.listener.connectedIdList.size === 1, "client listener should have one connection")
assert(apps.size === 1, "master should have 1 registered app")
}
// Send message to Master to request Executors, verify request by change in executor limit
val numExecutorsRequested = 1
assert(ci.client.requestTotalExecutors(numExecutorsRequested))
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
assert(apps.head.getExecutorLimit === numExecutorsRequested, s"executor request failed")
}
// Send request to kill executor, verify request was made
assert {
val apps = getApplications()
val executorId: String = apps.head.executors.head._2.fullId
ci.client.killExecutors(Seq(executorId))
}
// Issue stop command for Client to disconnect from Master
ci.client.stop()
// Verify Client is marked dead and unregistered from Master
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
assert(ci.listener.deadReasonList.size === 1, "client should have been marked dead")
assert(apps.isEmpty, "master should have 0 registered apps")
}
}
test("request from AppClient before initialized with master") {
val ci = new AppClientInst(masterRpcEnv.address.toSparkURL)
// requests to master should fail immediately
assert(ci.client.requestTotalExecutors(3) === false)
}
// ===============================
// | Utility methods for testing |
// ===============================
/** Return a SparkConf for applications that want to talk to our Master. */
private def appConf: SparkConf = {
new SparkConf()
.setMaster(masterRpcEnv.address.toSparkURL)
.setAppName("test")
.set("spark.executor.memory", "256m")
}
/** Make a master to which our application will send executor requests. */
private def makeMaster(): Master = {
val master = new Master(masterRpcEnv, masterRpcEnv.address, 0, securityManager, conf)
masterRpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
master
}
/** Make a few workers that talk to our master. */
private def makeWorkers(cores: Int, memory: Int): Seq[Worker] = {
(0 until numWorkers).map { i =>
val rpcEnv = workerRpcEnvs(i)
val worker = new Worker(rpcEnv, 0, cores, memory, Array(masterRpcEnv.address),
Worker.SYSTEM_NAME + i, Worker.ENDPOINT_NAME, null, conf, securityManager)
rpcEnv.setupEndpoint(Worker.ENDPOINT_NAME, worker)
worker
}
}
/** Get the Master state */
private def getMasterState: MasterStateResponse = {
master.self.askWithRetry[MasterStateResponse](RequestMasterState)
}
/** Get the applictions that are active from Master */
private def getApplications(): Seq[ApplicationInfo] = {
getMasterState.activeApps
}
/** Application Listener to collect events */
private class AppClientCollector extends AppClientListener with Logging {
val connectedIdList = new ArrayBuffer[String] with SynchronizedBuffer[String]
@volatile var disconnectedCount: Int = 0
val deadReasonList = new ArrayBuffer[String] with SynchronizedBuffer[String]
val execAddedList = new ArrayBuffer[String] with SynchronizedBuffer[String]
val execRemovedList = new ArrayBuffer[String] with SynchronizedBuffer[String]
def connected(id: String): Unit = {
connectedIdList += id
}
def disconnected(): Unit = {
synchronized {
disconnectedCount += 1
}
}
def dead(reason: String): Unit = {
deadReasonList += reason
}
def executorAdded(
id: String,
workerId: String,
hostPort: String,
cores: Int,
memory: Int): Unit = {
execAddedList += id
}
def executorRemoved(id: String, message: String, exitStatus: Option[Int]): Unit = {
execRemovedList += id
}
}
/** Create AppClient and supporting objects */
private class AppClientInst(masterUrl: String) {
val rpcEnv = RpcEnv.create("spark", Utils.localHostName(), 0, conf, securityManager)
private val cmd = new Command(TestExecutor.getClass.getCanonicalName.stripSuffix("$"),
List(), Map(), Seq(), Seq(), Seq())
private val desc = new ApplicationDescription("AppClientSuite", Some(1), 512, cmd, "ignored")
val listener = new AppClientCollector
val client = new AppClient(rpcEnv, Array(masterUrl), desc, listener, new SparkConf)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment