diff --git a/core/lib/mesos.jar b/core/lib/mesos.jar
index eb01ce8a15a84c1ed22c204fa6d499041915f09f..171a5ac02ccff226720a1e41c89c9d59ac13f5fc 100644
Binary files a/core/lib/mesos.jar and b/core/lib/mesos.jar differ
diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala
index d4d80845c52015328ad9b4a2140dba9e145faec9..80c95dcb99663fedf3c13dd9d4ca89221a3b6bb7 100644
--- a/core/src/main/scala/spark/Executor.scala
+++ b/core/src/main/scala/spark/Executor.scala
@@ -7,22 +7,24 @@ import java.util.concurrent._
 import scala.actors.remote.RemoteActor
 import scala.collection.mutable.ArrayBuffer
 
-import mesos.{ExecutorArgs, ExecutorDriver, MesosExecutorDriver}
-import mesos.{TaskDescription, TaskState, TaskStatus}
+import com.google.protobuf.ByteString
+
+import org.apache.mesos._
+import org.apache.mesos.Protos._
 
 import spark.broadcast._
 
 /**
  * The Mesos executor for Spark.
  */
-class Executor extends mesos.Executor with Logging {
+class Executor extends org.apache.mesos.Executor with Logging {
   var classLoader: ClassLoader = null
   var threadPool: ExecutorService = null
   var env: SparkEnv = null
 
   override def init(d: ExecutorDriver, args: ExecutorArgs) {
     // Read spark.* system properties from executor arg
-    val props = Utils.deserialize[Array[(String, String)]](args.getData)
+    val props = Utils.deserialize[Array[(String, String)]](args.getData.toByteArray)
     for ((key, value) <- props)
       System.setProperty(key, value)
 
@@ -44,40 +46,46 @@ class Executor extends mesos.Executor with Logging {
       1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
   }
   
-  override def launchTask(d: ExecutorDriver, desc: TaskDescription) {
-    // Pull taskId and arg out of TaskDescription because it won't be a
-    // valid pointer after this method call (TODO: fix this in C++/SWIG)
-    val taskId = desc.getTaskId
-    val arg = desc.getArg
-    threadPool.execute(new TaskRunner(taskId, arg, d))
+  override def launchTask(d: ExecutorDriver, task: TaskDescription) {
+    threadPool.execute(new TaskRunner(task, d))
   }
 
-  class TaskRunner(taskId: Int, arg: Array[Byte], d: ExecutorDriver)
+  class TaskRunner(desc: TaskDescription, d: ExecutorDriver)
   extends Runnable {
     override def run() = {
-      logInfo("Running task ID " + taskId)
+      logInfo("Running task ID " + desc.getTaskId)
+      d.sendStatusUpdate(TaskStatus.newBuilder()
+                         .setTaskId(desc.getTaskId)
+                         .setState(TaskState.TASK_RUNNING)
+                         .build())
       try {
         SparkEnv.set(env)
         Thread.currentThread.setContextClassLoader(classLoader)
         Accumulators.clear
-        val task = Utils.deserialize[Task[Any]](arg, classLoader)
+        val task = Utils.deserialize[Task[Any]](desc.getData.toByteArray, classLoader)
         for (gen <- task.generation) // Update generation if any is set
           env.mapOutputTracker.updateGeneration(gen)
-        val value = task.run(taskId)
+        val value = task.run(desc.getTaskId.getValue.toInt)
         val accumUpdates = Accumulators.values
         val result = new TaskResult(value, accumUpdates)
-        d.sendStatusUpdate(new TaskStatus(
-          taskId, TaskState.TASK_FINISHED, Utils.serialize(result)))
-        logInfo("Finished task ID " + taskId)
+        d.sendStatusUpdate(TaskStatus.newBuilder()
+                           .setTaskId(desc.getTaskId)
+                           .setState(TaskState.TASK_FINISHED)
+                           .setData(ByteString.copyFrom(Utils.serialize(result)))
+                           .build())
+        logInfo("Finished task ID " + desc.getTaskId)
       } catch {
         case ffe: FetchFailedException => {
           val reason = ffe.toTaskEndReason
-          d.sendStatusUpdate(new TaskStatus(
-            taskId, TaskState.TASK_FAILED, Utils.serialize(reason)))
+          d.sendStatusUpdate(TaskStatus.newBuilder()
+                             .setTaskId(desc.getTaskId)
+                             .setState(TaskState.TASK_FAILED)
+                             .setData(ByteString.copyFrom(Utils.serialize(reason)))
+                             .build())
         }
         case t: Throwable => {
           // TODO: Handle errors in tasks less dramatically
-          logError("Exception in task ID " + taskId, t)
+          logError("Exception in task ID " + desc.getTaskId, t)
           System.exit(1)
         }
       }
@@ -131,6 +139,18 @@ class Executor extends mesos.Executor with Logging {
     val out = new FileOutputStream(localPath)
     Utils.copyStream(in, out, true)
   }
+
+  override def error(d: ExecutorDriver, code: Int, message: String) {
+    logError("Error from Mesos: %s (code %d)".format(message, code))
+  }
+
+  override def killTask(d: ExecutorDriver, tid: TaskID) {
+    logWarning("Mesos asked us to kill task " + tid + "; ignoring (not yet implemented)")
+  }
+
+  override def shutdown(d: ExecutorDriver) {}
+
+  override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {}
 }
 
 /**
diff --git a/core/src/main/scala/spark/HadoopRDD.scala b/core/src/main/scala/spark/HadoopRDD.scala
index 5d8a2d0e35fd7388c475b11a4742084ceacda813..c87fa844c38929d181251df17b2bb666ecaf5e87 100644
--- a/core/src/main/scala/spark/HadoopRDD.scala
+++ b/core/src/main/scala/spark/HadoopRDD.scala
@@ -1,7 +1,5 @@
 package spark
 
-import mesos.SlaveOffer
-
 import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.NullWritable
 import org.apache.hadoop.io.Text
diff --git a/core/src/main/scala/spark/Job.scala b/core/src/main/scala/spark/Job.scala
index 6abbcbce5153adca58792f29c3e7cca8c6691903..9d75c083989a9d31b2a78249f9465f86765be353 100644
--- a/core/src/main/scala/spark/Job.scala
+++ b/core/src/main/scala/spark/Job.scala
@@ -1,13 +1,14 @@
 package spark
 
-import mesos._
+import org.apache.mesos._
+import org.apache.mesos.Protos._
 
 /**
  * Class representing a parallel job in MesosScheduler. Schedules the
  * job by implementing various callbacks.
  */
 abstract class Job(jobId: Int) {
-  def slaveOffer(s: SlaveOffer, availableCpus: Int, availableMem: Int)
+  def slaveOffer(s: SlaveOffer, availableCpus: Double, availableMem: Double)
     : Option[TaskDescription]
 
   def statusUpdate(t: TaskStatus): Unit
diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala
index 9776963c5fb38a278ea7f30ee4cd2632b79c0a59..7f43fffebf6876452d52c79dcb2a046edf0c76bf 100644
--- a/core/src/main/scala/spark/MesosScheduler.scala
+++ b/core/src/main/scala/spark/MesosScheduler.scala
@@ -12,8 +12,11 @@ import scala.collection.mutable.Map
 import scala.collection.mutable.Queue
 import scala.collection.JavaConversions._
 
-import mesos.{Scheduler => MScheduler}
-import mesos._
+import com.google.protobuf.ByteString
+
+import org.apache.mesos.{Scheduler => MScheduler}
+import org.apache.mesos._
+import org.apache.mesos.Protos._
 
 /**
  * The main Scheduler implementation, which runs jobs on Mesos. Clients should
@@ -37,8 +40,8 @@ extends MScheduler with DAGScheduler with Logging
   private var activeJobs = new HashMap[Int, Job]
   private var activeJobsQueue = new Queue[Job]
 
-  private var taskIdToJobId = new HashMap[Int, Int]
-  private var jobTasks = new HashMap[Int, HashSet[Int]]
+  private var taskIdToJobId = new HashMap[String, Int]
+  private var jobTasks = new HashMap[Int, HashSet[String]]
 
   // Incrementing job and task IDs
   private var nextJobId = 0
@@ -59,10 +62,10 @@ extends MScheduler with DAGScheduler with Logging
     return id
   }
 
-  def newTaskId(): Int = {
-    val id = nextTaskId;
+  def newTaskId(): TaskID = {
+    val id = "" + nextTaskId;
     nextTaskId += 1;
-    return id
+    return TaskID.newBuilder().setValue(id).build()
   }
   
   override def start() {
@@ -76,7 +79,13 @@ extends MScheduler with DAGScheduler with Logging
       override def run {
         val sched = MesosScheduler.this
         sched.driver = new MesosSchedulerDriver(sched, master)
-        sched.driver.run()
+        try {
+          val ret = sched.driver.run()
+          logInfo("driver.run() returned with code " + ret)
+        } catch {
+          case e: Exception =>
+            logError("driver.run() failed", e)
+        }
       }
     }.start
   }
@@ -92,13 +101,21 @@ extends MScheduler with DAGScheduler with Logging
           "or the SparkContext constructor")
     }
     val execScript = new File(sparkHome, "spark-executor").getCanonicalPath
-    val params = new JHashMap[String, String]
+    val params = Params.newBuilder()
     for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
       if (System.getenv(key) != null) {
-        params("env." + key) = System.getenv(key)
+        params.addParam(Param.newBuilder()
+                          .setKey("env." + key)
+                          .setValue(System.getenv(key))
+                          .build())
       }
     }
-    new ExecutorInfo(execScript, createExecArg(), params)
+    ExecutorInfo.newBuilder()
+      .setExecutorId(ExecutorID.newBuilder().setValue("default").build())
+      .setUri(execScript)
+      .setData(ByteString.copyFrom(createExecArg()))
+      .setParams(params.build())
+      .build()
   }
 
   
@@ -125,7 +142,7 @@ extends MScheduler with DAGScheduler with Logging
     }
   }
 
-  override def registered(d: SchedulerDriver, frameworkId: String) {
+  override def registered(d: SchedulerDriver, frameworkId: FrameworkID) {
     logInfo("Registered as framework ID " + frameworkId)
     registeredLock.synchronized {
       isRegistered = true
@@ -146,11 +163,11 @@ extends MScheduler with DAGScheduler with Logging
    * a round-robin manner so that tasks are balanced across the cluster.
    */
   override def resourceOffer(
-      d: SchedulerDriver, oid: String, offers: JList[SlaveOffer]) {
+      d: SchedulerDriver, oid: OfferID, offers: JList[SlaveOffer]) {
     synchronized {
       val tasks = new JArrayList[TaskDescription]
-      val availableCpus = offers.map(_.getParams.get("cpus").toInt)
-      val availableMem = offers.map(_.getParams.get("mem").toInt)
+      val availableCpus = offers.map(o => getResource(o.getResourcesList(), "cpus"))
+      val availableMem = offers.map(o => getResource(o.getResourcesList(), "mem"))
       var launchedTask = false
       for (job <- activeJobsQueue) {
         do {
@@ -160,10 +177,10 @@ extends MScheduler with DAGScheduler with Logging
               job.slaveOffer(offers(i), availableCpus(i), availableMem(i)) match {
                 case Some(task) =>
                   tasks.add(task)
-                  taskIdToJobId(task.getTaskId) = job.getId
-                  jobTasks(job.getId) += task.getTaskId
-                  availableCpus(i) -= task.getParams.get("cpus").toInt
-                  availableMem(i) -= task.getParams.get("mem").toInt
+                  taskIdToJobId(task.getTaskId.getValue) = job.getId
+                  jobTasks(job.getId) += task.getTaskId.getValue
+                  availableCpus(i) -= getResource(task.getResourcesList(), "cpus")
+                  availableMem(i) -= getResource(task.getResourcesList(), "mem")
                   launchedTask = true
                 case None => {}
               }
@@ -179,6 +196,13 @@ extends MScheduler with DAGScheduler with Logging
     }
   }
 
+  // Helper function to pull out a resource from a Mesos Resources protobuf
+  def getResource(res: JList[Resource], name: String): Double = {
+    for (r <- res if r.getName == name)
+      return r.getScalar.getValue
+    throw new IllegalArgumentException("No resource called " + name + " in " + res)
+  }
+
   // Check whether a Mesos task state represents a finished task
   def isFinished(state: TaskState) = {
     state == TaskState.TASK_FINISHED ||
@@ -190,18 +214,18 @@ extends MScheduler with DAGScheduler with Logging
   override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
     synchronized {
       try {
-        taskIdToJobId.get(status.getTaskId) match {
+        taskIdToJobId.get(status.getTaskId.getValue) match {
           case Some(jobId) =>
             if (activeJobs.contains(jobId)) {
               activeJobs(jobId).statusUpdate(status)
             }
             if (isFinished(status.getState)) {
-              taskIdToJobId.remove(status.getTaskId)
+              taskIdToJobId.remove(status.getTaskId.getValue)
               if (jobTasks.contains(jobId))
-                jobTasks(jobId) -= status.getTaskId
+                jobTasks(jobId) -= status.getTaskId.getValue
             }
           case None =>
-            logInfo("Ignoring update from TID " + status.getTaskId +
+            logInfo("Ignoring update from TID " + status.getTaskId.getValue +
               " because its job is gone")
         }
       } catch {
@@ -293,4 +317,10 @@ extends MScheduler with DAGScheduler with Logging
     // Serialize the map as an array of (String, String) pairs
     return Utils.serialize(props.toArray)
   }
+
+  override def frameworkMessage(d: SchedulerDriver, s: SlaveID, e: ExecutorID, b: Array[Byte]) {}
+
+  override def slaveLost(d: SchedulerDriver, s: SlaveID) {}
+
+  override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
 }
diff --git a/core/src/main/scala/spark/ParallelCollection.scala b/core/src/main/scala/spark/ParallelCollection.scala
index 36121766f54cae621d54252daef795d0f7ddba0a..a2e271b028d4faefb05e2d1a5cc04e23bbb1c10d 100644
--- a/core/src/main/scala/spark/ParallelCollection.scala
+++ b/core/src/main/scala/spark/ParallelCollection.scala
@@ -1,7 +1,5 @@
 package spark
 
-import mesos.SlaveOffer
-
 import java.util.concurrent.atomic.AtomicLong
 
 @serializable class ParallelCollectionSplit[T: ClassManifest](
diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala
index aa1610fb89e9f08a881f4fad3825fd13f2c5be5a..811725b0551bd4b725639357326eaa4a08e79ed8 100644
--- a/core/src/main/scala/spark/SimpleJob.scala
+++ b/core/src/main/scala/spark/SimpleJob.scala
@@ -5,7 +5,10 @@ import java.util.{HashMap => JHashMap}
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
 
-import mesos._
+import com.google.protobuf.ByteString
+
+import org.apache.mesos._
+import org.apache.mesos.Protos._
 
 
 /**
@@ -19,8 +22,8 @@ extends Job(jobId) with Logging
   val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "5000").toLong
 
   // CPUs and memory to request per task
-  val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toInt
-  val MEM_PER_TASK = System.getProperty("spark.task.mem", "512").toInt
+  val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toDouble
+  val MEM_PER_TASK = System.getProperty("spark.task.mem", "512").toDouble
 
   // Maximum times a task is allowed to fail before failing the job
   val MAX_TASK_FAILURES = 4
@@ -31,7 +34,7 @@ extends Job(jobId) with Logging
   val launched = new Array[Boolean](numTasks)
   val finished = new Array[Boolean](numTasks)
   val numFailures = new Array[Int](numTasks)
-  val tidToIndex = HashMap[Int, Int]()
+  val tidToIndex = HashMap[String, Int]()
 
   var tasksLaunched = 0
   var tasksFinished = 0
@@ -126,13 +129,13 @@ extends Job(jobId) with Logging
   }
 
   // Respond to an offer of a single slave from the scheduler by finding a task
-  def slaveOffer(offer: SlaveOffer, availableCpus: Int, availableMem: Int)
+  def slaveOffer(offer: SlaveOffer, availableCpus: Double, availableMem: Double)
       : Option[TaskDescription] = {
     if (tasksLaunched < numTasks && availableCpus >= CPUS_PER_TASK &&
         availableMem >= MEM_PER_TASK) {
       val time = System.currentTimeMillis
       val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
-      val host = offer.getHost
+      val host = offer.getHostname
       findTask(host, localOnly) match {
         case Some(index) => {
           // Found a task; do some bookkeeping and return a Mesos task for it
@@ -146,20 +149,35 @@ extends Job(jobId) with Logging
               jobId, index, taskId, offer.getSlaveId, host, prefStr)
           logInfo(message)
           // Do various bookkeeping
-          tidToIndex(taskId) = index
+          tidToIndex(taskId.getValue) = index
           launched(index) = true
           tasksLaunched += 1
           if (preferred)
             lastPreferredLaunchTime = time
           // Create and return the Mesos task object
-          val params = new JHashMap[String, String]
-          params.put("cpus", CPUS_PER_TASK.toString)
-          params.put("mem", MEM_PER_TASK.toString)
+          val cpuRes = Resource.newBuilder()
+                         .setName("cpus")
+                         .setType(Resource.Type.SCALAR)
+                         .setScalar(Resource.Scalar.newBuilder()
+                                      .setValue(CPUS_PER_TASK).build())
+                         .build()
+          val memRes = Resource.newBuilder()
+                         .setName("mem")
+                         .setType(Resource.Type.SCALAR)
+                         .setScalar(Resource.Scalar.newBuilder()
+                                      .setValue(MEM_PER_TASK).build())
+                         .build()
           val serializedTask = Utils.serialize(task)
           logDebug("Serialized size: " + serializedTask.size)
           val taskName = "task %d:%d".format(jobId, index)
-          return Some(new TaskDescription(
-            taskId, offer.getSlaveId, taskName, params, serializedTask))
+          return Some(TaskDescription.newBuilder()
+                        .setTaskId(taskId)
+                        .setSlaveId(offer.getSlaveId)
+                        .setName(taskName)
+                        .addResources(cpuRes)
+                        .addResources(memRes)
+                        .setData(ByteString.copyFrom(serializedTask))
+                        .build())
         }
         case _ =>
       }
@@ -183,13 +201,13 @@ extends Job(jobId) with Logging
 
   def taskFinished(status: TaskStatus) {
     val tid = status.getTaskId
-    val index = tidToIndex(tid)
+    val index = tidToIndex(tid.getValue)
     if (!finished(index)) {
       tasksFinished += 1
-      logInfo("Finished TID %d (progress: %d/%d)".format(
+      logInfo("Finished TID %s (progress: %d/%d)".format(
         tid, tasksFinished, numTasks))
       // Deserialize task result
-      val result = Utils.deserialize[TaskResult[_]](status.getData)
+      val result = Utils.deserialize[TaskResult[_]](status.getData.toByteArray)
       sched.taskEnded(tasks(index), Success, result.value, result.accumUpdates)
       // Mark finished and stop if we've finished all the tasks
       finished(index) = true
@@ -203,15 +221,15 @@ extends Job(jobId) with Logging
 
   def taskLost(status: TaskStatus) {
     val tid = status.getTaskId
-    val index = tidToIndex(tid)
+    val index = tidToIndex(tid.getValue)
     if (!finished(index)) {
-      logInfo("Lost TID %d (task %d:%d)".format(tid, jobId, index))
+      logInfo("Lost TID %s (task %d:%d)".format(tid, jobId, index))
       launched(index) = false
       tasksLaunched -= 1
       // Check if the problem is a map output fetch failure. In that case, this
       // task will never succeed on any node, so tell the scheduler about it.
-      if (status.getData != null && status.getData.length > 0) {
-        val reason = Utils.deserialize[TaskEndReason](status.getData)
+      if (status.getData != null && status.getData.size > 0) {
+        val reason = Utils.deserialize[TaskEndReason](status.getData.toByteArray)
         reason match {
           case fetchFailed: FetchFailed =>
             logInfo("Loss was due to fetch failure from " + fetchFailed.serverUri)
diff --git a/core/src/main/scala/spark/Task.scala b/core/src/main/scala/spark/Task.scala
index 70547445acdae5f3dc5ede08c2454f63de0d90c9..03274167e15578b5b170334bf04d1c3c88995a37 100644
--- a/core/src/main/scala/spark/Task.scala
+++ b/core/src/main/scala/spark/Task.scala
@@ -1,7 +1,5 @@
 package spark
 
-import mesos._
-
 @serializable
 class TaskContext(val stageId: Int, val splitId: Int, val attemptId: Int) {
 }
diff --git a/project/build/SparkProject.scala b/project/build/SparkProject.scala
index 58908731a3f048db78ed18dfefd640224078bf4b..4d5ab3be08fcd55b43849bc6da7b81ac47b25b2a 100644
--- a/project/build/SparkProject.scala
+++ b/project/build/SparkProject.scala
@@ -39,6 +39,7 @@ class SparkProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
     val asm = "asm" % "asm-all" % "3.3.1"
     val scalaTest = "org.scalatest" % "scalatest" % "1.3" % "test"
     val scalaCheck = "org.scala-tools.testing" %% "scalacheck" % "1.7" % "test"
+    val protobuf = "com.google.protobuf" % "protobuf-java" % "2.3.0"
   }
 
   class ReplProject(info: ProjectInfo) extends DefaultProject(info) with BaseProject with DepJar with XmlTestReport