diff --git a/.travis.yml b/.travis.yml
index c16f76399ccd229cbfe48380e584aaaa298369ba..8739849a20798dc76432acd62f8d12a86d284b29 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -44,7 +44,7 @@ notifications:
 # 5. Run maven install before running lint-java.
 install:
   - export MAVEN_SKIP_RC=1
-  - build/mvn -T 4 -q -DskipTests -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive -Phive-thriftserver install
+  - build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive -Phive-thriftserver install
 
 # 6. Run lint-java.
 script:
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 971a62f87a218343ebc45ab90459907fac8710ad..ec243eaebaea7c34ff133b1d380b193c473eb7cc 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -138,6 +138,16 @@
         </dependency>
       </dependencies>
     </profile>
+    <profile>
+      <id>mesos</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.spark</groupId>
+          <artifactId>spark-mesos_${scala.binary.version}</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
     <profile>
       <id>hive</id>
       <dependencies>
diff --git a/core/pom.xml b/core/pom.xml
index ab6c3ce80527571c04b798988f22e96b7217c343..c04cf7e5255f250ee560bf45b49a5416450a96c1 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -215,11 +215,6 @@
       <groupId>org.glassfish.jersey.containers</groupId>
       <artifactId>jersey-container-servlet-core</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.apache.mesos</groupId>
-      <artifactId>mesos</artifactId>
-      <classifier>${mesos.classifier}</classifier>
-    </dependency>
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-all</artifactId>
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 2eaeab1d807b45a8b8bc986feb229a61e17725db..08d6343d623cf5bd3916c6f266b1130d1c7aaeb3 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -42,7 +42,6 @@ import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, Sequence
   TextInputFormat}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
-import org.apache.mesos.MesosNativeLibrary
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.broadcast.Broadcast
@@ -56,7 +55,6 @@ import org.apache.spark.rdd._
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend}
-import org.apache.spark.scheduler.cluster.mesos.{MesosCoarseGrainedSchedulerBackend, MesosFineGrainedSchedulerBackend}
 import org.apache.spark.scheduler.local.LocalSchedulerBackend
 import org.apache.spark.storage._
 import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
@@ -2512,18 +2510,6 @@ object SparkContext extends Logging {
         }
         (backend, scheduler)
 
-      case MESOS_REGEX(mesosUrl) =>
-        MesosNativeLibrary.load()
-        val scheduler = new TaskSchedulerImpl(sc)
-        val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)
-        val backend = if (coarseGrained) {
-          new MesosCoarseGrainedSchedulerBackend(scheduler, sc, mesosUrl, sc.env.securityManager)
-        } else {
-          new MesosFineGrainedSchedulerBackend(scheduler, sc, mesosUrl)
-        }
-        scheduler.initialize(backend)
-        (backend, scheduler)
-
       case masterUrl =>
         val cm = getClusterManager(masterUrl) match {
           case Some(clusterMgr) => clusterMgr
@@ -2545,7 +2531,7 @@ object SparkContext extends Logging {
   private def getClusterManager(url: String): Option[ExternalClusterManager] = {
     val loader = Utils.getContextOrSparkClassLoader
     val serviceLoaders =
-    ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
+      ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
     if (serviceLoaders.size > 1) {
       throw new SparkException(s"Multiple Cluster Managers ($serviceLoaders) registered " +
           s"for the url $url:")
@@ -2566,8 +2552,6 @@ private object SparkMasterRegex {
   val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
   // Regular expression for connecting to Spark deploy clusters
   val SPARK_REGEX = """spark://(.*)""".r
-  // Regular expression for connection to Mesos cluster by mesos:// or mesos://zk:// url
-  val MESOS_REGEX = """mesos://(.*)""".r
 }
 
 /**
diff --git a/core/src/main/scala/org/apache/spark/TaskState.scala b/core/src/main/scala/org/apache/spark/TaskState.scala
index d232fae6b15b33dd38353866e8c41f16fb939050..cbace7b5f9f37c3acf0fe0dac77316bfad9d3129 100644
--- a/core/src/main/scala/org/apache/spark/TaskState.scala
+++ b/core/src/main/scala/org/apache/spark/TaskState.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark
 
-import org.apache.mesos.Protos.{TaskState => MesosTaskState}
-
 private[spark] object TaskState extends Enumeration {
 
   val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value
@@ -30,22 +28,4 @@ private[spark] object TaskState extends Enumeration {
   def isFailed(state: TaskState): Boolean = (LOST == state) || (FAILED == state)
 
   def isFinished(state: TaskState): Boolean = FINISHED_STATES.contains(state)
-
-  def toMesos(state: TaskState): MesosTaskState = state match {
-    case LAUNCHING => MesosTaskState.TASK_STARTING
-    case RUNNING => MesosTaskState.TASK_RUNNING
-    case FINISHED => MesosTaskState.TASK_FINISHED
-    case FAILED => MesosTaskState.TASK_FAILED
-    case KILLED => MesosTaskState.TASK_KILLED
-    case LOST => MesosTaskState.TASK_LOST
-  }
-
-  def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match {
-    case MesosTaskState.TASK_STAGING | MesosTaskState.TASK_STARTING => LAUNCHING
-    case MesosTaskState.TASK_RUNNING | MesosTaskState.TASK_KILLING => RUNNING
-    case MesosTaskState.TASK_FINISHED => FINISHED
-    case MesosTaskState.TASK_FAILED => FAILED
-    case MesosTaskState.TASK_KILLED => KILLED
-    case MesosTaskState.TASK_LOST | MesosTaskState.TASK_ERROR => LOST
-  }
 }
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index 7d75a93ff68393979734c9de1a7e0b65f1d6723c..f8938dfedee5b70d39bfebf942caa5ed2dcdc94c 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -22,7 +22,6 @@ import org.scalatest.PrivateMethodTester
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
-import org.apache.spark.scheduler.cluster.mesos.{MesosCoarseGrainedSchedulerBackend, MesosFineGrainedSchedulerBackend}
 import org.apache.spark.scheduler.local.LocalSchedulerBackend
 
 
@@ -130,31 +129,4 @@ class SparkContextSchedulerCreationSuite
       case _ => fail()
     }
   }
-
-  def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) {
-    val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString)
-    try {
-      val sched = createTaskScheduler(master, "client", conf)
-      assert(sched.backend.getClass === expectedClass)
-    } catch {
-      case e: UnsatisfiedLinkError =>
-        assert(e.getMessage.contains("mesos"))
-        logWarning("Mesos not available, could not test actual Mesos scheduler creation")
-      case e: Throwable => fail(e)
-    }
-  }
-
-  test("mesos fine-grained") {
-    testMesos("mesos://localhost:1234", classOf[MesosFineGrainedSchedulerBackend], coarse = false)
-  }
-
-  test("mesos coarse-grained") {
-    testMesos("mesos://localhost:1234", classOf[MesosCoarseGrainedSchedulerBackend], coarse = true)
-  }
-
-  test("mesos with zookeeper") {
-    testMesos("mesos://zk://localhost:1234,localhost:2345",
-      classOf[MesosFineGrainedSchedulerBackend], coarse = false)
-  }
-
 }
diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh
index 2833dc76511177f88459b878d95c1da9fb34cc81..96f9b5714ebb8073fd2fd9ad5c00dd463d01b932 100755
--- a/dev/create-release/release-build.sh
+++ b/dev/create-release/release-build.sh
@@ -80,7 +80,7 @@ NEXUS_PROFILE=d63f592e7eac0 # Profile for Spark staging uploads
 BASE_DIR=$(pwd)
 
 MVN="build/mvn --force"
-PUBLISH_PROFILES="-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2"
+PUBLISH_PROFILES="-Pmesos -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2"
 PUBLISH_PROFILES="$PUBLISH_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl"
 
 rm -rf spark
@@ -186,12 +186,13 @@ if [[ "$1" == "package" ]]; then
 
   # We increment the Zinc port each time to avoid OOM's and other craziness if multiple builds
   # share the same Zinc server.
-  make_binary_release "hadoop2.3" "-Psparkr -Phadoop-2.3 -Phive -Phive-thriftserver -Pyarn" "3033" &
-  make_binary_release "hadoop2.4" "-Psparkr -Phadoop-2.4 -Phive -Phive-thriftserver -Pyarn" "3034" &
-  make_binary_release "hadoop2.6" "-Psparkr -Phadoop-2.6 -Phive -Phive-thriftserver -Pyarn" "3035" &
-  make_binary_release "hadoop2.7" "-Psparkr -Phadoop-2.7 -Phive -Phive-thriftserver -Pyarn" "3036" &
-  make_binary_release "hadoop2.4-without-hive" "-Psparkr -Phadoop-2.4 -Pyarn" "3037" &
-  make_binary_release "without-hadoop" "-Psparkr -Phadoop-provided -Pyarn" "3038" &
+  FLAGS="-Psparkr -Phive -Phive-thriftserver -Pyarn -Pmesos"
+  make_binary_release "hadoop2.3" "-Phadoop2.3 $FLAGS" "3033" &
+  make_binary_release "hadoop2.4" "-Phadoop2.4 $FLAGS" "3034" &
+  make_binary_release "hadoop2.6" "-Phadoop2.6 $FLAGS" "3035" &
+  make_binary_release "hadoop2.7" "-Phadoop2.7 $FLAGS" "3036" &
+  make_binary_release "hadoop2.4-without-hive" "-Psparkr -Phadoop-2.4 -Pyarn -Pmesos" "3037" &
+  make_binary_release "without-hadoop" "-Psparkr -Phadoop-provided -Pyarn -Pmesos" "3038" &
   wait
   rm -rf spark-$SPARK_VERSION-bin-*/
 
diff --git a/dev/lint-java b/dev/lint-java
index fe8ab83d562d174176025bec80f569330e523d07..c2e80538ef2a5f1e2d6b14833c7e2dcc59904d42 100755
--- a/dev/lint-java
+++ b/dev/lint-java
@@ -20,7 +20,7 @@
 SCRIPT_DIR="$( cd "$( dirname "$0" )" && pwd )"
 SPARK_ROOT_DIR="$(dirname $SCRIPT_DIR)"
 
-ERRORS=$($SCRIPT_DIR/../build/mvn -Pkinesis-asl -Pyarn -Phive -Phive-thriftserver checkstyle:check | grep ERROR)
+ERRORS=$($SCRIPT_DIR/../build/mvn -Pkinesis-asl -Pmesos -Pyarn -Phive -Phive-thriftserver checkstyle:check | grep ERROR)
 
 if test ! -z "$ERRORS"; then
     echo -e "Checkstyle checks failed at following occurrences:\n$ERRORS"
diff --git a/dev/mima b/dev/mima
index c3553490451c8bdce467cc216a6711562ab6ab97..11c4af29808a84dc7e8d99959392f0f7352e6ed7 100755
--- a/dev/mima
+++ b/dev/mima
@@ -24,7 +24,7 @@ set -e
 FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
 cd "$FWDIR"
 
-SPARK_PROFILES="-Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"
+SPARK_PROFILES="-Pmesos -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"
 TOOLS_CLASSPATH="$(build/sbt -DcopyDependencies=false "export tools/fullClasspath" | tail -n1)"
 OLD_DEPS_CLASSPATH="$(build/sbt -DcopyDependencies=false $SPARK_PROFILES "export oldDeps/fullClasspath" | tail -n1)"
 
diff --git a/dev/scalastyle b/dev/scalastyle
index 8fd3604b9f4510eabeee9caada7ba38c3270912e..f3dec833636c60bf3bfe01ffcfdf875f4abc29f7 100755
--- a/dev/scalastyle
+++ b/dev/scalastyle
@@ -22,6 +22,7 @@
 ERRORS=$(echo -e "q\n" \
     | build/sbt \
         -Pkinesis-asl \
+        -Pmesos \
         -Pyarn \
         -Phive \
         -Phive-thriftserver \
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index ce5725764be6dc34299adebb90bc236981f753fd..f2aa241a4b8ff2f0ecc94b43619ce88b3fd9ea30 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -458,6 +458,13 @@ yarn = Module(
     ]
 )
 
+mesos = Module(
+    name="mesos",
+    dependencies=[],
+    source_file_regexes=["mesos/"],
+    sbt_test_goals=["mesos/test"]
+)
+
 # The root module is a dummy module which is used to run all of the tests.
 # No other modules should directly depend on this module.
 root = Module(
diff --git a/dev/test-dependencies.sh b/dev/test-dependencies.sh
index 28e3d4d8d4f00b839facb87e9c008501b6d0e569..4014f42e1983ccf1a4457999b271d51389b7df6f 100755
--- a/dev/test-dependencies.sh
+++ b/dev/test-dependencies.sh
@@ -29,7 +29,7 @@ export LC_ALL=C
 # TODO: This would be much nicer to do in SBT, once SBT supports Maven-style resolution.
 
 # NOTE: These should match those in the release publishing script
-HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pyarn -Phive"
+HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pyarn -Phive"
 MVN="build/mvn"
 HADOOP_PROFILES=(
     hadoop-2.2
diff --git a/docs/building-spark.md b/docs/building-spark.md
index 2c987cf8346ef6f1bec2b611c6d0252c0ac41796..6908fc1ba74d058ab2cc425d4d36dc7f7caec1ed 100644
--- a/docs/building-spark.md
+++ b/docs/building-spark.md
@@ -50,7 +50,7 @@ To create a Spark distribution like those distributed by the
 to be runnable, use `./dev/make-distribution.sh` in the project root directory. It can be configured
 with Maven profile settings and so on like the direct Maven build. Example:
 
-    ./dev/make-distribution.sh --name custom-spark --tgz -Psparkr -Phadoop-2.4 -Phive -Phive-thriftserver -Pyarn
+    ./dev/make-distribution.sh --name custom-spark --tgz -Psparkr -Phadoop-2.4 -Phive -Phive-thriftserver -Pmesos -Pyarn
 
 For more information on usage, run `./dev/make-distribution.sh --help`
 
@@ -105,13 +105,17 @@ By default Spark will build with Hive 1.2.1 bindings.
 
 ## Packaging without Hadoop Dependencies for YARN
 
-The assembly directory produced by `mvn package` will, by default, include all of Spark's 
-dependencies, including Hadoop and some of its ecosystem projects. On YARN deployments, this 
-causes multiple versions of these to appear on executor classpaths: the version packaged in 
+The assembly directory produced by `mvn package` will, by default, include all of Spark's
+dependencies, including Hadoop and some of its ecosystem projects. On YARN deployments, this
+causes multiple versions of these to appear on executor classpaths: the version packaged in
 the Spark assembly and the version on each node, included with `yarn.application.classpath`.
-The `hadoop-provided` profile builds the assembly without including Hadoop-ecosystem projects, 
+The `hadoop-provided` profile builds the assembly without including Hadoop-ecosystem projects,
 like ZooKeeper and Hadoop itself.
 
+## Building with Mesos support
+
+    ./build/mvn -Pmesos -DskipTests clean package
+
 ## Building for Scala 2.10
 To produce a Spark package compiled with Scala 2.10, use the `-Dscala-2.10` property:
 
@@ -263,17 +267,17 @@ The run-tests script also can be limited to a specific Python version or a speci
 
 ## Running R Tests
 
-To run the SparkR tests you will need to install the R package `testthat` 
-(run `install.packages(testthat)` from R shell).  You can run just the SparkR tests using 
+To run the SparkR tests you will need to install the R package `testthat`
+(run `install.packages(testthat)` from R shell).  You can run just the SparkR tests using
 the command:
 
     ./R/run-tests.sh
 
 ## Running Docker-based Integration Test Suites
 
-In order to run Docker integration tests, you have to install the `docker` engine on your box. 
-The instructions for installation can be found at [the Docker site](https://docs.docker.com/engine/installation/). 
-Once installed, the `docker` service needs to be started, if not already running. 
+In order to run Docker integration tests, you have to install the `docker` engine on your box.
+The instructions for installation can be found at [the Docker site](https://docs.docker.com/engine/installation/).
+Once installed, the `docker` service needs to be started, if not already running.
 On Linux, this can be done by `sudo service docker start`.
 
     ./build/mvn install -DskipTests
diff --git a/mesos/pom.xml b/mesos/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..57cc26a4ccef94cd1b1cd3c7d4d4c65bd98ec95c
--- /dev/null
+++ b/mesos/pom.xml
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.11</artifactId>
+    <version>2.1.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>spark-mesos_2.11</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Project Mesos</name>
+  <properties>
+    <sbt.project.name>mesos</sbt.project.name>
+    <mesos.version>1.0.0</mesos.version>
+    <mesos.classifier>shaded-protobuf</mesos.classifier>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.mesos</groupId>
+      <artifactId>mesos</artifactId>
+      <version>${mesos.version}</version>
+      <classifier>${mesos.classifier}</classifier>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-java</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- Explicitly depend on shaded dependencies from the parent, since shaded deps aren't transitive -->
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-server</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-plus</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-util</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-http</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-servlet</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-servlets</artifactId>
+    </dependency>
+    <!-- End of shaded deps. -->
+
+  </dependencies>
+
+
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  </build>
+
+</project>
diff --git a/mesos/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager b/mesos/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
new file mode 100644
index 0000000000000000000000000000000000000000..12b6d5b64d68c183c3c6cbb6c9b3202ebc77f633
--- /dev/null
+++ b/mesos/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
@@ -0,0 +1 @@
+org.apache.spark.scheduler.cluster.mesos.MesosClusterManager
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
similarity index 100%
rename from core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
rename to mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
similarity index 100%
rename from core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
rename to mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
similarity index 100%
rename from core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
rename to mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
similarity index 100%
rename from core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
rename to mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
similarity index 100%
rename from core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
rename to mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
similarity index 100%
rename from core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
rename to mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
similarity index 100%
rename from core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
rename to mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
similarity index 100%
rename from core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
rename to mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
similarity index 93%
rename from core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
rename to mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index 680cfb733e9e693e691d116549bc69527e371f85..1937bd30bac51a24ff95a0c3d04f3d8408fadce5 100644
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -26,25 +26,26 @@ import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}
 import org.apache.mesos.protobuf.ByteString
 
 import org.apache.spark.{SparkConf, SparkEnv, TaskState}
-import org.apache.spark.TaskState.TaskState
+import org.apache.spark.TaskState
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.cluster.mesos.MesosTaskLaunchData
+import org.apache.spark.scheduler.cluster.mesos.{MesosSchedulerUtils, MesosTaskLaunchData}
 import org.apache.spark.util.Utils
 
 private[spark] class MesosExecutorBackend
   extends MesosExecutor
+  with MesosSchedulerUtils // TODO: fix
   with ExecutorBackend
   with Logging {
 
   var executor: Executor = null
   var driver: ExecutorDriver = null
 
-  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
+  override def statusUpdate(taskId: Long, state: TaskState.TaskState, data: ByteBuffer) {
     val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build()
     driver.sendStatusUpdate(MesosTaskStatus.newBuilder()
       .setTaskId(mesosTaskId)
-      .setState(TaskState.toMesos(state))
+      .setState(taskStateToMesos(state))
       .setData(ByteString.copyFrom(data))
       .build())
   }
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala
new file mode 100644
index 0000000000000000000000000000000000000000..a849c4afa24f578d96e44a75283148dff93d37ab
--- /dev/null
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
+
+/**
+ * Cluster Manager for creation of Yarn scheduler and backend
+ */
+private[spark] class MesosClusterManager extends ExternalClusterManager {
+  private val MESOS_REGEX = """mesos://(.*)""".r
+
+  override def canCreate(masterURL: String): Boolean = {
+    masterURL.startsWith("mesos")
+  }
+
+  override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
+    new TaskSchedulerImpl(sc)
+  }
+
+  override def createSchedulerBackend(sc: SparkContext,
+      masterURL: String,
+      scheduler: TaskScheduler): SchedulerBackend = {
+    val mesosUrl = MESOS_REGEX.findFirstMatchIn(masterURL).get.group(1)
+    val coarse = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)
+    if (coarse) {
+      new MesosCoarseGrainedSchedulerBackend(
+        scheduler.asInstanceOf[TaskSchedulerImpl],
+        sc,
+        mesosUrl,
+        sc.env.securityManager)
+    } else {
+      new MesosFineGrainedSchedulerBackend(
+        scheduler.asInstanceOf[TaskSchedulerImpl],
+        sc,
+        mesosUrl)
+    }
+  }
+
+  override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
+    scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
+  }
+}
+
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
similarity index 100%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
rename to mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
similarity index 99%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
rename to mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index bb6f6b3e3ffd59a0b3b43b36df4093717bb1720b..0b454997772d8b908fb528cf7e8711b5cec16148 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -680,7 +680,7 @@ private[spark] class MesosClusterScheduler(
             retryState = Some(new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec)))
           pendingRetryDrivers += newDriverDescription
           pendingRetryDriversState.persist(taskId, newDriverDescription)
-        } else if (TaskState.isFinished(TaskState.fromMesos(status.getState))) {
+        } else if (TaskState.isFinished(mesosToTaskState(status.getState))) {
           removeFromLaunchedDrivers(taskId)
           state.finishDate = Some(new Date())
           if (finishedDrivers.size >= retainedDrivers) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala
similarity index 100%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala
rename to mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
similarity index 99%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
rename to mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 6b9313e5edb976b01ced6aeb677a2da6de9447b3..fde1fb32280208191fe3e003e565d74bb6889ff3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -473,7 +473,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: TaskStatus) {
     val taskId = status.getTaskId.getValue
     val slaveId = status.getSlaveId.getValue
-    val state = TaskState.fromMesos(status.getState)
+    val state = mesosToTaskState(status.getState)
 
     logInfo(s"Mesos task $taskId is now ${status.getState}")
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
similarity index 99%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
rename to mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index f1e48fa7c52e11b2a7e52aa6bd680de9a6fef460..eb3b235949501a8ac685f19e7f027156b4460732 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -366,9 +366,9 @@ private[spark] class MesosFineGrainedSchedulerBackend(
   override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: TaskStatus) {
     inClassLoader() {
       val tid = status.getTaskId.getValue.toLong
-      val state = TaskState.fromMesos(status.getState)
+      val state = mesosToTaskState(status.getState)
       synchronized {
-        if (TaskState.isFailed(TaskState.fromMesos(status.getState))
+        if (TaskState.isFailed(mesosToTaskState(status.getState))
           && taskIdToSlaveId.contains(tid)) {
           // We lost the executor on this slave, so remember that it's gone
           removeExecutor(taskIdToSlaveId(tid), "Lost executor")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
similarity index 100%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
rename to mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
similarity index 94%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
rename to mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 1bbede18533e06628823d7dcfca71d97834a7c31..e19d4451372076b8c7680f348e8fa605e5fbbe72 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -26,19 +26,21 @@ import scala.util.control.NonFatal
 
 import com.google.common.base.Splitter
 import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler, SchedulerDriver}
-import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
 import org.apache.mesos.protobuf.{ByteString, GeneratedMessage}
 
 import org.apache.spark.{SparkConf, SparkContext, SparkException}
+import org.apache.spark.TaskState
 import org.apache.spark.internal.Logging
 import org.apache.spark.util.Utils
 
 
+
 /**
  * Shared trait for implementing a Mesos Scheduler. This holds common state and helper
  * methods and Mesos scheduler will use.
  */
-private[mesos] trait MesosSchedulerUtils extends Logging {
+trait MesosSchedulerUtils extends Logging {
   // Lock used to wait for scheduler to be registered
   private final val registerLatch = new CountDownLatch(1)
 
@@ -491,4 +493,22 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
     sc.conf.remove("spark.mesos.driver.frameworkId")
     System.clearProperty("spark.mesos.driver.frameworkId")
   }
+
+  def mesosToTaskState(state: MesosTaskState): TaskState.TaskState = state match {
+    case MesosTaskState.TASK_STAGING | MesosTaskState.TASK_STARTING => TaskState.LAUNCHING
+    case MesosTaskState.TASK_RUNNING | MesosTaskState.TASK_KILLING => TaskState.RUNNING
+    case MesosTaskState.TASK_FINISHED => TaskState.FINISHED
+    case MesosTaskState.TASK_FAILED => TaskState.FAILED
+    case MesosTaskState.TASK_KILLED => TaskState.KILLED
+    case MesosTaskState.TASK_LOST | MesosTaskState.TASK_ERROR => TaskState.LOST
+  }
+
+  def taskStateToMesos(state: TaskState.TaskState): MesosTaskState = state match {
+    case TaskState.LAUNCHING => MesosTaskState.TASK_STARTING
+    case TaskState.RUNNING => MesosTaskState.TASK_RUNNING
+    case TaskState.FINISHED => MesosTaskState.TASK_FINISHED
+    case TaskState.FAILED => MesosTaskState.TASK_FAILED
+    case TaskState.KILLED => MesosTaskState.TASK_KILLED
+    case TaskState.LOST => MesosTaskState.TASK_LOST
+  }
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
similarity index 100%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
rename to mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..6fce06632c57ead0ea4431d22f4c415001cc95e1
--- /dev/null
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
+
+class MesosClusterManagerSuite extends SparkFunSuite with LocalSparkContext {
+    def testURL(masterURL: String, expectedClass: Class[_], coarse: Boolean) {
+      val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString)
+      sc = new SparkContext("local", "test", conf)
+      val clusterManager = new MesosClusterManager()
+
+      assert(clusterManager.canCreate(masterURL))
+      val taskScheduler = clusterManager.createTaskScheduler(sc, masterURL)
+      val sched = clusterManager.createSchedulerBackend(sc, masterURL, taskScheduler)
+      assert(sched.getClass === expectedClass)
+    }
+
+    test("mesos fine-grained") {
+      testURL("mesos://localhost:1234", classOf[MesosFineGrainedSchedulerBackend], coarse = false)
+    }
+
+    test("mesos coarse-grained") {
+      testURL("mesos://localhost:1234", classOf[MesosCoarseGrainedSchedulerBackend], coarse = true)
+    }
+
+    test("mesos with zookeeper") {
+      testURL("mesos://zk://localhost:1234,localhost:2345",
+          classOf[MesosFineGrainedSchedulerBackend],
+          coarse = false)
+    }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
similarity index 100%
rename from core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
rename to mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
similarity index 100%
rename from core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
rename to mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
similarity index 100%
rename from core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
rename to mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
similarity index 100%
rename from core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
rename to mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
similarity index 100%
rename from core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
rename to mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
similarity index 100%
rename from core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
rename to mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
diff --git a/pom.xml b/pom.xml
index 989658216e5fd75c08bf5c88537bb84223ec8415..74238db59ed8f78a96c526108b3b90d7af559e1b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,8 +119,6 @@
     <java.version>1.7</java.version>
     <maven.version>3.3.9</maven.version>
     <sbt.project.name>spark</sbt.project.name>
-    <mesos.version>1.0.0</mesos.version>
-    <mesos.classifier>shaded-protobuf</mesos.classifier>
     <slf4j.version>1.7.16</slf4j.version>
     <log4j.version>1.2.17</log4j.version>
     <hadoop.version>2.2.0</hadoop.version>
@@ -527,18 +525,6 @@
         <version>${protobuf.version}</version>
         <scope>${hadoop.deps.scope}</scope>
       </dependency>
-      <dependency>
-        <groupId>org.apache.mesos</groupId>
-        <artifactId>mesos</artifactId>
-        <version>${mesos.version}</version>
-        <classifier>${mesos.classifier}</classifier>
-        <exclusions>
-          <exclusion>
-            <groupId>com.google.protobuf</groupId>
-            <artifactId>protobuf-java</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
       <dependency>
         <groupId>org.roaringbitmap</groupId>
         <artifactId>RoaringBitmap</artifactId>
@@ -2527,6 +2513,13 @@
       </modules>
     </profile>
 
+    <profile>
+      <id>mesos</id>
+      <modules>
+        <module>mesos</module>
+      </modules>
+    </profile>
+
     <profile>
       <id>hive-thriftserver</id>
       <modules>
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 688218f6f43af3d2c3fd30e28f0ca3c2681c9070..16f26e7d283b4d04308c519fe9804cab81e26d36 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -40,7 +40,9 @@ object MimaExcludes {
       // [SPARK-16199][SQL] Add a method to list the referenced columns in data source Filter
       ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.Filter.references"),
       // [SPARK-16853][SQL] Fixes encoder error in DataSet typed select
-      ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.Dataset.select")
+      ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.Dataset.select"),
+      // [SPARK-16967] Move Mesos to Module
+      ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkMasterRegex.MESOS_REGEX")
     )
   }
 
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index c769ba300e5e68ed4508f25fa5e4d97cd20bb320..83a7c0864f76d2fc6797a319c20a3825cbe11da1 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -56,9 +56,9 @@ object BuildCommons {
     "tags", "sketch"
   ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects
 
-  val optionallyEnabledProjects@Seq(yarn, java8Tests, sparkGangliaLgpl,
+  val optionallyEnabledProjects@Seq(mesos, yarn, java8Tests, sparkGangliaLgpl,
     streamingKinesisAsl, dockerIntegrationTests) =
-    Seq("yarn", "java8-tests", "ganglia-lgpl", "streaming-kinesis-asl",
+    Seq("mesos", "yarn", "java8-tests", "ganglia-lgpl", "streaming-kinesis-asl",
       "docker-integration-tests").map(ProjectRef(buildLocation, _))
 
   val assemblyProjects@Seq(networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKafka010Assembly, streamingKinesisAslAssembly) =