diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index ee900867292fe6da43e233fe1022cdeae2b6c170..94678815e806ad66680880e59ac4f7c01d1fd67c 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -65,9 +65,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
 
   def this(args: ClientArguments) = this(new Configuration(), args)
 
-  def run() {
+  def runApp(): ApplicationId = {
     validateArgs()
-
     // Initialize and start the client service.
     init(yarnConf)
     start()
@@ -104,8 +103,12 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
 
     // Finally, submit and monitor the application.
     submitApp(appContext)
-    monitorApplication(appId)
+    appId
+  }
 
+  def run() {
+    val appId = runApp()
+    monitorApplication(appId)
     System.exit(0)
   }
 
@@ -406,7 +409,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
       javaCommand + 
       " -server " +
       JAVA_OPTS +
-      " org.apache.spark.deploy.yarn.ApplicationMaster" +
+      " " + args.amClass +
       " --class " + args.userClass + 
       " --jar " + args.userJar +
       userArgsToString(args) +
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 6d3c95867e389bf24bdd8c33235e7e53aeb15e90..9efb28a9426726c4e41e4287291b8ab939ba942e 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -37,6 +37,7 @@ class ClientArguments(val args: Array[String]) {
   var numWorkers = 2
   var amQueue = System.getProperty("QUEUE", "default")
   var amMemory: Int = 512 // MB
+  var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
   var appName: String = "Spark"
   // TODO
   var inputFormatInfo: List[InputFormatInfo] = null
@@ -65,6 +66,10 @@ class ClientArguments(val args: Array[String]) {
           userArgsBuffer += value
           args = tail
 
+        case ("--master-class") :: value :: tail =>
+          amClass = value
+          args = tail
+
         case ("--master-memory") :: MemoryParam(value) :: tail =>
           amMemory = value
           args = tail
@@ -122,19 +127,20 @@ class ClientArguments(val args: Array[String]) {
     System.err.println(
       "Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
       "Options:\n" +
-      "  --jar JAR_PATH       Path to your application's JAR file (required)\n" +
-      "  --class CLASS_NAME   Name of your application's main class (required)\n" +
-      "  --args ARGS          Arguments to be passed to your application's main class.\n" +
-      "                       Mutliple invocations are possible, each will be passed in order.\n" +
-      "  --num-workers NUM    Number of workers to start (Default: 2)\n" +
-      "  --worker-cores NUM   Number of cores for the workers (Default: 1). This is unsused right now.\n" +
-      "  --master-memory MEM  Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
-      "  --worker-memory MEM  Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
-      "  --name NAME          The name of your application (Default: Spark)\n" +
-      "  --queue QUEUE        The hadoop queue to use for allocation requests (Default: 'default')\n" +
-      "  --addJars jars       Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
-      "  --files files        Comma separated list of files to be distributed with the job.\n" +
-      "  --archives archives  Comma separated list of archives to be distributed with the job."
+      "  --jar JAR_PATH             Path to your application's JAR file (required)\n" +
+      "  --class CLASS_NAME         Name of your application's main class (required)\n" +
+      "  --args ARGS                Arguments to be passed to your application's main class.\n" +
+      "                             Mutliple invocations are possible, each will be passed in order.\n" +
+      "  --num-workers NUM          Number of workers to start (Default: 2)\n" +
+      "  --worker-cores NUM         Number of cores for the workers (Default: 1). This is unsused right now.\n" +
+      "  --master-class CLASS_NAME  Class Name for Master (Default: spark.deploy.yarn.ApplicationMaster)\n" +
+      "  --master-memory MEM        Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
+      "  --worker-memory MEM        Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
+      "  --name NAME                The name of your application (Default: Spark)\n" +
+      "  --queue QUEUE              The hadoop queue to use for allocation requests (Default: 'default')\n" +
+      "  --addJars jars             Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
+      "  --files files              Comma separated list of files to be distributed with the job.\n" +
+      "  --archives archives        Comma separated list of archives to be distributed with the job."
       )
     System.exit(exitCode)
   }
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
new file mode 100644
index 0000000000000000000000000000000000000000..c38f33e212fbf2e2bf60fa1dd4793012ea288f81
--- /dev/null
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.Socket
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+import akka.actor._
+import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
+import akka.actor.Terminated
+import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.scheduler.SplitInfo
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+
+class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
+
+  def this(args: ApplicationMasterArguments) = this(args, new Configuration())
+
+  private var appAttemptId: ApplicationAttemptId = _
+  private var reporterThread: Thread = _
+  private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+
+  private var yarnAllocator: YarnAllocationHandler = _
+  private var driverClosed:Boolean = false
+
+  private var amClient: AMRMClient[ContainerRequest] = _
+
+  val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0)._1
+  var actor: ActorRef = _
+
+  // This actor just working as a monitor to watch on Driver Actor.
+  class MonitorActor(driverUrl: String) extends Actor {
+
+    var driver: ActorRef = null
+
+    override def preStart() {
+      logInfo("Listen to driver: " + driverUrl)
+      driver = context.actorFor(driverUrl)
+      context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+      context.watch(driver) // Doesn't work with remote actors, but useful for testing
+    }
+
+    override def receive = {
+      case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+        logInfo("Driver terminated or disconnected! Shutting down.")
+        driverClosed = true
+    }
+  }
+
+  def run() {
+
+    amClient = AMRMClient.createAMRMClient()
+    amClient.init(yarnConf)
+    amClient.start()
+
+    appAttemptId = getApplicationAttemptId()
+    registerApplicationMaster()
+
+    waitForSparkMaster()
+
+    // Allocate all containers
+    allocateWorkers()
+
+    // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
+    // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
+
+    val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+    // must be <= timeoutInterval/ 2.
+    // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
+    // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
+    val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
+    reporterThread = launchReporterThread(interval)
+
+    // Wait for the reporter thread to Finish.
+    reporterThread.join()
+
+    finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+    actorSystem.shutdown()
+
+    logInfo("Exited")
+    System.exit(0)
+  }
+
+  private def getApplicationAttemptId(): ApplicationAttemptId = {
+    val envs = System.getenv()
+    val containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.name())
+    val containerId = ConverterUtils.toContainerId(containerIdString)
+    val appAttemptId = containerId.getApplicationAttemptId()
+    logInfo("ApplicationAttemptId: " + appAttemptId)
+    appAttemptId
+  }
+
+  private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
+    logInfo("Registering the ApplicationMaster")
+    // TODO:(Raymond) Find out Spark UI address and fill in here?
+    amClient.registerApplicationMaster(Utils.localHostName(), 0, "")
+  }
+
+  private def waitForSparkMaster() {
+    logInfo("Waiting for Spark driver to be reachable.")
+    var driverUp = false
+    val hostport = args.userArgs(0)
+    val (driverHost, driverPort) = Utils.parseHostPort(hostport)
+    while(!driverUp) {
+      try {
+        val socket = new Socket(driverHost, driverPort)
+        socket.close()
+        logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
+        driverUp = true
+      } catch {
+        case e: Exception =>
+          logError("Failed to connect to driver at %s:%s, retrying ...".
+            format(driverHost, driverPort))
+        Thread.sleep(100)
+      }
+    }
+    System.setProperty("spark.driver.host", driverHost)
+    System.setProperty("spark.driver.port", driverPort.toString)
+
+    val driverUrl = "akka://spark@%s:%s/user/%s".format(
+      driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
+
+    actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
+  }
+
+
+  private def allocateWorkers() {
+
+    // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
+    val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
+      scala.collection.immutable.Map()
+
+    yarnAllocator = YarnAllocationHandler.newAllocator(
+      yarnConf,
+      amClient,
+      appAttemptId,
+      args,
+      preferredNodeLocationData)
+
+    logInfo("Allocating " + args.numWorkers + " workers.")
+    // Wait until all containers have finished
+    // TODO: This is a bit ugly. Can we make it nicer?
+    // TODO: Handle container failure
+
+    yarnAllocator.addResourceRequests(args.numWorkers)
+    while(yarnAllocator.getNumWorkersRunning < args.numWorkers) {
+      yarnAllocator.allocateResources()
+      Thread.sleep(100)
+    }
+
+    logInfo("All workers have launched.")
+
+  }
+
+  // TODO: We might want to extend this to allocate more containers in case they die !
+  private def launchReporterThread(_sleepTime: Long): Thread = {
+    val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+
+    val t = new Thread {
+      override def run() {
+        while (!driverClosed) {
+          val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning -
+            yarnAllocator.getNumPendingAllocate
+          if (missingWorkerCount > 0) {
+            logInfo("Allocating %d containers to make up for (potentially) lost containers".
+              format(missingWorkerCount))
+            yarnAllocator.addResourceRequests(missingWorkerCount)
+          }
+          sendProgress()
+          Thread.sleep(sleepTime)
+        }
+      }
+    }
+    // setting to daemon status, though this is usually not a good idea.
+    t.setDaemon(true)
+    t.start()
+    logInfo("Started progress reporter thread - sleep time : " + sleepTime)
+    t
+  }
+
+  private def sendProgress() {
+    logDebug("Sending progress")
+    // simulated with an allocate request with no nodes requested ...
+    yarnAllocator.allocateResources()
+  }
+
+  def finishApplicationMaster(status: FinalApplicationStatus) {
+    logInfo("finish ApplicationMaster with " + status)
+    amClient.unregisterApplicationMaster(status, "" /* appMessage */, "" /* appTrackingUrl */)
+  }
+
+}
+
+
+object WorkerLauncher {
+  def main(argStrings: Array[String]) {
+    val args = new ApplicationMasterArguments(argStrings)
+    new WorkerLauncher(args).run()
+  }
+}
diff --git a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
new file mode 100644
index 0000000000000000000000000000000000000000..63a0449e5a0730085554d2b8ae86067135fa8dba
--- /dev/null
+++ b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark._
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.deploy.yarn.YarnAllocationHandler
+import org.apache.spark.util.Utils
+
+/**
+ *
+ * This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM.
+ */
+private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) {
+
+  def this(sc: SparkContext) = this(sc, new Configuration())
+
+  // By default, rack is unknown
+  override def getRackForHost(hostPort: String): Option[String] = {
+    val host = Utils.parseHostPort(hostPort)._1
+    val retval = YarnAllocationHandler.lookupRack(conf, host)
+    if (retval != null) Some(retval) else None
+  }
+
+  override def postStartHook() {
+
+    // The yarn application is running, but the worker might not yet ready
+    // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
+    Thread.sleep(2000L)
+    logInfo("YarnClientClusterScheduler.postStartHook done")
+  }
+}
diff --git a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
new file mode 100644
index 0000000000000000000000000000000000000000..b206780c7806e15c84944db05876f89c8f848040
--- /dev/null
+++ b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
+import org.apache.spark.{SparkException, Logging, SparkContext}
+import org.apache.spark.deploy.yarn.{Client, ClientArguments}
+
+private[spark] class YarnClientSchedulerBackend(
+    scheduler: ClusterScheduler,
+    sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
+  with Logging {
+
+  var client: Client = null
+  var appId: ApplicationId = null
+
+  override def start() {
+    super.start()
+
+    val defalutWorkerCores = "2"
+    val defalutWorkerMemory = "512m"
+    val defaultWorkerNumber = "1"
+
+    val userJar = System.getenv("SPARK_YARN_APP_JAR")
+    var workerCores = System.getenv("SPARK_WORKER_CORES")
+    var workerMemory = System.getenv("SPARK_WORKER_MEMORY")
+    var workerNumber = System.getenv("SPARK_WORKER_INSTANCES")
+
+    if (userJar == null)
+      throw new SparkException("env SPARK_YARN_APP_JAR is not set")
+
+    if (workerCores == null)
+      workerCores = defalutWorkerCores
+    if (workerMemory == null)
+      workerMemory = defalutWorkerMemory
+    if (workerNumber == null)
+      workerNumber = defaultWorkerNumber
+
+    val driverHost = System.getProperty("spark.driver.host")
+    val driverPort = System.getProperty("spark.driver.port")
+    val hostport = driverHost + ":" + driverPort
+
+    val argsArray = Array[String](
+      "--class", "notused",
+      "--jar", userJar,
+      "--args", hostport,
+      "--worker-memory", workerMemory,
+      "--worker-cores", workerCores,
+      "--num-workers", workerNumber,
+      "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
+    )
+
+    val args = new ClientArguments(argsArray)
+    client = new Client(args)
+    appId = client.runApp()
+    waitForApp()
+  }
+
+  def waitForApp() {
+
+    // TODO : need a better way to find out whether the workers are ready or not
+    // maybe by resource usage report?
+    while(true) {
+      val report = client.getApplicationReport(appId)
+
+      logInfo("Application report from ASM: \n" +
+        "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
+        "\t appStartTime: " + report.getStartTime() + "\n" +
+        "\t yarnAppState: " + report.getYarnApplicationState() + "\n"
+      )
+
+      // Ready to go, or already gone.
+      val state = report.getYarnApplicationState()
+      if (state == YarnApplicationState.RUNNING) {
+        return
+      } else if (state == YarnApplicationState.FINISHED ||
+        state == YarnApplicationState.FAILED ||
+        state == YarnApplicationState.KILLED) {
+        throw new SparkException("Yarn application already ended," +
+          "might be killed or not able to launch application master.")
+      }
+
+      Thread.sleep(1000)
+    }
+  }
+
+  override def stop() {
+    super.stop()
+    client.stop()
+    logInfo("Stoped")
+  }
+
+}