diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e7eabd289699cd4138736aa1ac555c958358019e..f322a770bf18795a13970da31e781e896f8b9589 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2389,53 +2389,6 @@ object SparkContext extends Logging {
         }
         (backend, scheduler)
 
-      case "yarn" if deployMode == "cluster" =>
-        val scheduler = try {
-          val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
-          val cons = clazz.getConstructor(classOf[SparkContext])
-          cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
-        } catch {
-          // TODO: Enumerate the exact reasons why it can fail
-          // But irrespective of it, it means we cannot proceed !
-          case e: Exception =>
-            throw new SparkException("YARN mode not available ?", e)
-        }
-        val backend = try {
-          val clazz =
-            Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
-          val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
-          cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
-        } catch {
-          case e: Exception =>
-            throw new SparkException("YARN mode not available ?", e)
-        }
-        scheduler.initialize(backend)
-        (backend, scheduler)
-
-      case "yarn" if deployMode == "client" =>
-        val scheduler = try {
-          val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler")
-          val cons = clazz.getConstructor(classOf[SparkContext])
-          cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
-
-        } catch {
-          case e: Exception =>
-            throw new SparkException("YARN mode not available ?", e)
-        }
-
-        val backend = try {
-          val clazz =
-            Utils.classForName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
-          val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
-          cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
-        } catch {
-          case e: Exception =>
-            throw new SparkException("YARN mode not available ?", e)
-        }
-
-        scheduler.initialize(backend)
-        (backend, scheduler)
-
       case MESOS_REGEX(mesosUrl) =>
         MesosNativeLibrary.load()
         val scheduler = new TaskSchedulerImpl(sc)
@@ -2464,6 +2417,7 @@ object SparkContext extends Logging {
           cm.initialize(scheduler, backend)
           (backend, scheduler)
         } catch {
+          case se: SparkException => throw se
           case NonFatal(e) =>
             throw new SparkException("External scheduler cannot be instantiated", e)
         }
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index 49c2bf6bcad18b7d5a4d45e69333e9fe17dd4c56..213d70f4e5840db73a63658d347e57e39e2bfcf4 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -129,26 +129,6 @@ class SparkContextSchedulerCreationSuite
     }
   }
 
-  def testYarn(master: String, deployMode: String, expectedClassName: String) {
-    try {
-      val sched = createTaskScheduler(master, deployMode)
-      assert(sched.getClass === Utils.classForName(expectedClassName))
-    } catch {
-      case e: SparkException =>
-        assert(e.getMessage.contains("YARN mode not available"))
-        logWarning("YARN not available, could not test actual YARN scheduler creation")
-      case e: Throwable => fail(e)
-    }
-  }
-
-  test("yarn-cluster") {
-    testYarn("yarn", "cluster", "org.apache.spark.scheduler.cluster.YarnClusterScheduler")
-  }
-
-  test("yarn-client") {
-    testYarn("yarn", "client", "org.apache.spark.scheduler.cluster.YarnScheduler")
-  }
-
   def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) {
     val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString)
     try {
diff --git a/yarn/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager b/yarn/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
new file mode 100644
index 0000000000000000000000000000000000000000..6e8a1ebfc61da1645bea9f064c013964ef472edc
--- /dev/null
+++ b/yarn/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
@@ -0,0 +1 @@
+org.apache.spark.scheduler.cluster.YarnClusterManager
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala
new file mode 100644
index 0000000000000000000000000000000000000000..64cd1bd088001493e3da2f1893b5dc641b01dc75
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
+
+/**
+ * Cluster Manager for creation of Yarn scheduler and backend
+ */
+private[spark] class YarnClusterManager extends ExternalClusterManager {
+
+  override def canCreate(masterURL: String): Boolean = {
+    masterURL == "yarn"
+  }
+
+  override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
+    sc.deployMode match {
+      case "cluster" => new YarnClusterScheduler(sc)
+      case "client" => new YarnScheduler(sc)
+      case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
+    }
+  }
+
+  override def createSchedulerBackend(sc: SparkContext,
+      masterURL: String,
+      scheduler: TaskScheduler): SchedulerBackend = {
+    sc.deployMode match {
+      case "cluster" =>
+        new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
+      case "client" =>
+        new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
+      case  _ =>
+        throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
+    }
+  }
+
+  override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
+    scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
+  }
+}