From d5f23e0083b8b00109eb466ef5fb74558dcbc040 Mon Sep 17 00:00:00 2001
From: Patrick Wendell <pwendell@gmail.com>
Date: Wed, 25 Dec 2013 00:24:00 -0800
Subject: [PATCH] Adding scheduling and reporting based on cores

---
 .../main/scala/org/apache/spark/deploy/master/Master.scala   | 2 +-
 .../scala/org/apache/spark/deploy/master/WorkerInfo.scala    | 4 ++--
 .../scala/org/apache/spark/deploy/master/ui/IndexPage.scala  | 5 ++++-
 .../scala/org/apache/spark/deploy/worker/DriverRunner.scala  | 4 ++--
 .../main/scala/org/apache/spark/deploy/worker/Worker.scala   | 2 +-
 .../scala/org/apache/spark/deploy/worker/ui/IndexPage.scala  | 5 ++++-
 6 files changed, 14 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 9bfacfc999..939e000695 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -425,7 +425,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
     // First schedule drivers, they take strict precedence over applications
     for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
       for (driver <- Seq(waitingDrivers: _*)) {
-        if (worker.memoryFree > driver.desc.mem) {
+        if (worker.memoryFree > driver.desc.mem && worker.coresFree > driver.desc.cores) {
           launchDriver(worker, driver)
           waitingDrivers -= driver
         }
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index 27c2ff4b8c..6e5177baa6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -88,13 +88,13 @@ private[spark] class WorkerInfo(
   def addDriver(driver: DriverInfo) {
     drivers(driver.id) = driver
     memoryUsed += driver.desc.mem
-    coresUsed += 1
+    coresUsed += driver.desc.cores
   }
 
   def removeDriver(driver: DriverInfo) {
     drivers -= driver.id
     memoryUsed -= driver.desc.mem
-    coresUsed -= 1
+    coresUsed -= driver.desc.cores
   }
 
   def webUiAddress : String = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index 13903b4a1d..24d10cec4a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -57,7 +57,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
     val completedApps = state.completedApps.sortBy(_.endTime).reverse
     val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)
 
-    val driverHeaders = Seq("ID", "Submitted Time", "Worker", "State", "Memory", "Main Class")
+    val driverHeaders = Seq("ID", "Submitted Time", "Worker", "State", "Cores", "Memory", "Main Class")
     val activeDrivers = state.activeDrivers.sortBy(_.startTime).reverse
     val activeDriversTable = UIUtils.listingTable(driverHeaders, driverRow, activeDrivers)
     val completedDrivers = state.completedDrivers.sortBy(_.startTime).reverse
@@ -166,6 +166,9 @@ private[spark] class IndexPage(parent: MasterWebUI) {
       <td>{driver.submitDate}</td>
       <td>{driver.worker.map(w => w.id.toString).getOrElse("None")}</td>
       <td>{driver.state}</td>
+      <td sorttable_customkey={driver.desc.cores.toString}>
+        {driver.desc.cores.toString}
+      </td>
       <td sorttable_customkey={driver.desc.mem.toString}>
         {Utils.megabytesToString(driver.desc.mem.toLong)}
       </td>
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 41a089ad33..ba13f22fc5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -53,8 +53,8 @@ private[spark] class DriverRunner(
         try {
           val driverDir = createWorkingDirectory()
           val localJarFilename = downloadUserJar(driverDir)
-          val command = Seq("java") ++ driverDesc.javaOptions ++ Seq("-cp", localJarFilename) ++
-            Seq(driverDesc.mainClass) ++ driverDesc.options
+          val command = Seq("java") ++ driverDesc.javaOptions ++ Seq(s"-Xmx${driverDesc.mem}m")
+            Seq("-cp", localJarFilename) ++ Seq(driverDesc.mainClass) ++ driverDesc.options
           runCommandWithRetry(command, driverDesc.envVars, driverDir)
         }
         catch {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index dd6783a344..3c5159ac13 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -279,7 +279,7 @@ private[spark] class Worker(
       }
       val driver = drivers(driverId)
       memoryUsed -= driver.driverDesc.mem
-      coresUsed -= 1
+      coresUsed -= driver.driverDesc.cores
       drivers -= driverId
       finishedDrivers(driverId) = driver
     }
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
index 2c37b7184d..35e8d58215 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -56,7 +56,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
     val finishedExecutorTable =
       UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors)
 
-    val driverHeaders = Seq("DriverID", "Main Class", "Memory", "Logs")
+    val driverHeaders = Seq("DriverID", "Main Class", "Cores", "Memory", "Logs")
     val runningDriverTable =
       UIUtils.listingTable(driverHeaders, driverRow, workerState.drivers)
     def finishedDriverTable =
@@ -138,6 +138,9 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
     <tr>
       <td>{driver.driverId}</td>
       <td>{driver.driverDesc.mainClass}</td>
+      <td sorttable_customkey={driver.driverDesc.cores.toString}>
+        {driver.driverDesc.cores.toString}
+      </td>
       <td sorttable_customkey={driver.driverDesc.mem.toString}>
         {Utils.megabytesToString(driver.driverDesc.mem)}
       </td>
-- 
GitLab