diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 24edc60684376cb2e0cfbe9c652ceb4d7aab00b7..c463ee09993a2c6c89a10c677b24691209188778 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -23,6 +23,7 @@ import java.net.{URI, URL}
 import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
 
 import org.apache.spark.executor.ExecutorURLClassLoader
+import org.apache.spark.util.Utils
 
 /**
  * Scala code behind the spark-submit script.  The script handles setting up the classpath with
@@ -128,6 +129,15 @@ object SparkSubmit {
       childArgs += ("--class", appArgs.mainClass)
     }
 
+    if (clusterManager == YARN) {
+      // The choice of class is arbitrary, could use any spark-yarn class
+      if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) {
+        val msg = "Could not load YARN classes. This copy of Spark may not have been compiled " +
+          "with YARN support."
+        throw new Exception(msg)
+      }
+    }
+
     val options = List[OptionAssigner](
       new OptionAssigner(appArgs.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
       new OptionAssigner(appArgs.driverExtraClassPath, STANDALONE | YARN, true,
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index b678604ff81c82a38f866049caf294a0ac8b9a45..79f314c8dd36cba9e67c1211847c957bd720fb99 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -28,6 +28,7 @@ import scala.collection.Map
 import scala.collection.mutable.ArrayBuffer
 import scala.io.Source
 import scala.reflect.ClassTag
+import scala.util.Try
 
 import com.google.common.io.Files
 import org.apache.commons.lang.SystemUtils
@@ -137,6 +138,11 @@ private[spark] object Utils extends Logging {
   def getContextOrSparkClassLoader =
     Option(Thread.currentThread().getContextClassLoader).getOrElse(getSparkClassLoader)
 
+  /** Determines whether the provided class is loadable in the current thread. */
+  def classIsLoadable(clazz: String): Boolean = {
+    Try { Class.forName(clazz, false, getContextOrSparkClassLoader) }.isSuccess
+  }
+
   /**
    * Primitive often used when writing {@link java.nio.ByteBuffer} to {@link java.io.DataOutput}.
    */