Skip to content
Snippets Groups Projects
Commit e2d7656c authored by Jey Kottalam's avatar Jey Kottalam
Browse files

re-enable YARN support

parent bd0bab47
No related branches found
No related tags found
No related merge requests found
......@@ -61,9 +61,21 @@ class SparkEnv (
// If executorId is NOT found, return defaultHostPort
var executorIdToHostPort: Option[(String, String) => String]) {
val hadoop = new SparkHadoopUtil
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
val hadoop = {
val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
if(yarnMode) {
try {
Class.forName("spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
} catch {
case th: Throwable => throw new SparkException("Unable to load YARN support", th)
}
} else {
new SparkHadoopUtil
}
}
def stop() {
pythonWorkers.foreach { case(key, worker) => worker.stop() }
httpFileServer.stop()
......
......@@ -130,11 +130,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
try {
val socket = new Socket(driverHost, driverPort.toInt)
socket.close()
logInfo("Master now available: " + driverHost + ":" + driverPort)
logInfo("Driver now available: " + driverHost + ":" + driverPort)
driverUp = true
} catch {
case e: Exception =>
logError("Failed to connect to driver at " + driverHost + ":" + driverPort)
logWarning("Failed to connect to driver at " + driverHost + ":" + driverPort + ", retrying")
Thread.sleep(100)
}
}
......
......@@ -165,7 +165,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*")
Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH")
Client.populateHadoopClasspath(yarnConf, env)
SparkHadoopUtil.setYarnMode(env)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_JAR_PATH") =
localResources("spark.jar").getResource().getScheme.toString() + "://" +
localResources("spark.jar").getResource().getFile().toString()
......@@ -313,8 +313,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
object Client {
def main(argStrings: Array[String]) {
// Set an env variable indicating we are running in YARN mode.
// Note that anything with SPARK prefix gets propagated to all (remote) processes
System.setProperty("SPARK_YARN_MODE", "true")
val args = new ClientArguments(argStrings)
SparkHadoopUtil.setYarnMode()
new Client(args).run
}
......
......@@ -15,8 +15,9 @@
* limitations under the License.
*/
package spark.deploy
package spark.deploy.yarn
import spark.deploy.SparkHadoopUtil
import collection.mutable.HashMap
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.UserGroupInformation
......@@ -28,32 +29,17 @@ import java.security.PrivilegedExceptionAction
/**
* Contains util methods to interact with Hadoop from spark.
*/
object SparkHadoopUtil {
val yarnConf = newConfiguration()
class YarnSparkHadoopUtil extends SparkHadoopUtil {
// Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true.
def isYarnMode(): Boolean = {
val yarnMode = System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))
java.lang.Boolean.valueOf(yarnMode)
}
// Set an env variable indicating we are running in YARN mode.
// Note that anything with SPARK prefix gets propagated to all (remote) processes
def setYarnMode() {
System.setProperty("SPARK_YARN_MODE", "true")
}
def setYarnMode(env: HashMap[String, String]) {
env("SPARK_YARN_MODE") = "true"
}
override def isYarnMode(): Boolean = { true }
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
// Always create a new config, dont reuse yarnConf.
def newConfiguration(): Configuration = new YarnConfiguration(new Configuration())
override def newConfiguration(): Configuration = new YarnConfiguration(new Configuration())
// add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
def addCredentials(conf: JobConf) {
override def addCredentials(conf: JobConf) {
val jobCreds = conf.getCredentials();
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment