Skip to content
Snippets Groups Projects
Commit 8faf5c51 authored by Mridul Muralidharan's avatar Mridul Muralidharan
Browse files

Patch from Thomas Graves to improve the YARN Client, and move to more...

Patch from Thomas Graves to improve the YARN Client, and move to more production ready hadoop yarn branch
parent b11058f4
No related branches found
No related tags found
No related merge requests found
......@@ -297,6 +297,11 @@
<artifactId>hadoop-yarn-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
......
......@@ -7,6 +7,7 @@ import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.client.YarnClientImpl
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import scala.collection.mutable.HashMap
......@@ -16,19 +17,19 @@ import org.apache.hadoop.yarn.util.{Apps, Records, ConverterUtils}
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import spark.deploy.SparkHadoopUtil
class Client(conf: Configuration, args: ClientArguments) extends Logging {
class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
def this(args: ClientArguments) = this(new Configuration(), args)
var applicationsManager: ClientRMProtocol = null
var rpc: YarnRPC = YarnRPC.create(conf)
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
def run() {
connectToASM()
init(yarnConf)
start()
logClusterResourceDetails()
val newApp = getNewApplication()
val newApp = super.getNewApplication()
val appId = newApp.getApplicationId()
verifyClusterResources(newApp)
......@@ -47,64 +48,17 @@ class Client(conf: Configuration, args: ClientArguments) extends Logging {
System.exit(0)
}
def connectToASM() {
val rmAddress: InetSocketAddress = NetUtils.createSocketAddr(
yarnConf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS)
)
logInfo("Connecting to ResourceManager at" + rmAddress)
applicationsManager = rpc.getProxy(classOf[ClientRMProtocol], rmAddress, conf)
.asInstanceOf[ClientRMProtocol]
}
def logClusterResourceDetails() {
val clusterMetrics: YarnClusterMetrics = getYarnClusterMetrics
val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
logInfo("Got Cluster metric info from ASM, numNodeManagers=" + clusterMetrics.getNumNodeManagers)
/*
val clusterNodeReports: List[NodeReport] = getNodeReports
logDebug("Got Cluster node info from ASM")
for (node <- clusterNodeReports) {
logDebug("Got node report from ASM for, nodeId=" + node.getNodeId + ", nodeAddress=" + node.getHttpAddress +
", nodeRackName=" + node.getRackName + ", nodeNumContainers=" + node.getNumContainers + ", nodeHealthStatus=" + node.getNodeHealthStatus)
}
*/
val queueInfo: QueueInfo = getQueueInfo(args.amQueue)
val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
logInfo("Queue info .. queueName=" + queueInfo.getQueueName + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity +
", queueMaxCapacity=" + queueInfo.getMaximumCapacity + ", queueApplicationCount=" + queueInfo.getApplications.size +
", queueChildQueueCount=" + queueInfo.getChildQueues.size)
}
def getYarnClusterMetrics: YarnClusterMetrics = {
val request: GetClusterMetricsRequest = Records.newRecord(classOf[GetClusterMetricsRequest])
val response: GetClusterMetricsResponse = applicationsManager.getClusterMetrics(request)
return response.getClusterMetrics
}
def getNodeReports: List[NodeReport] = {
val request: GetClusterNodesRequest = Records.newRecord(classOf[GetClusterNodesRequest])
val response: GetClusterNodesResponse = applicationsManager.getClusterNodes(request)
return response.getNodeReports.toList
}
def getQueueInfo(queueName: String): QueueInfo = {
val request: GetQueueInfoRequest = Records.newRecord(classOf[GetQueueInfoRequest])
request.setQueueName(queueName)
request.setIncludeApplications(true)
request.setIncludeChildQueues(false)
request.setRecursive(false)
Records.newRecord(classOf[GetQueueInfoRequest])
return applicationsManager.getQueueInfo(request).getQueueInfo
}
def getNewApplication(): GetNewApplicationResponse = {
logInfo("Requesting new Application")
val request = Records.newRecord(classOf[GetNewApplicationRequest])
val response = applicationsManager.getNewApplication(request)
logInfo("Got new ApplicationId: " + response.getApplicationId())
return response
}
def verifyClusterResources(app: GetNewApplicationResponse) = {
val maxMem = app.getMaximumResourceCapability().getMemory()
......@@ -265,23 +219,15 @@ class Client(conf: Configuration, args: ClientArguments) extends Logging {
}
def submitApp(appContext: ApplicationSubmissionContext) = {
// Create the request to send to the applications manager
val appRequest = Records.newRecord(classOf[SubmitApplicationRequest])
.asInstanceOf[SubmitApplicationRequest]
appRequest.setApplicationSubmissionContext(appContext)
// Submit the application to the applications manager
logInfo("Submitting application to ASM")
applicationsManager.submitApplication(appRequest)
super.submitApplication(appContext)
}
def monitorApplication(appId: ApplicationId): Boolean = {
while(true) {
Thread.sleep(1000)
val reportRequest = Records.newRecord(classOf[GetApplicationReportRequest])
.asInstanceOf[GetApplicationReportRequest]
reportRequest.setApplicationId(appId)
val reportResponse = applicationsManager.getApplicationReport(reportRequest)
val report = reportResponse.getApplicationReport()
val report = super.getApplicationReport(appId)
logInfo("Application report from ASM: \n" +
"\t application identifier: " + appId.toString() + "\n" +
......
......@@ -564,7 +564,9 @@
<id>hadoop2-yarn</id>
<properties>
<hadoop.major.version>2</hadoop.major.version>
<yarn.version>2.0.2-alpha</yarn.version>
<!-- 0.23.* is same as 2.0.* - except hardened to run production jobs -->
<yarn.version>0.23.7</yarn.version>
<!-- <yarn.version>2.0.2-alpha</yarn.version> -->
</properties>
<repositories>
......@@ -599,6 +601,11 @@
<artifactId>hadoop-yarn-common</artifactId>
<version>${yarn.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>${yarn.version}</version>
</dependency>
<!-- Specify Avro version because Kafka also has it as a dependency -->
<dependency>
<groupId>org.apache.avro</groupId>
......
......@@ -20,7 +20,7 @@ object SparkBuild extends Build {
//val HADOOP_YARN = false
// For Hadoop 2 YARN support
val HADOOP_VERSION = "2.0.2-alpha"
val HADOOP_VERSION = "0.23.7"
val HADOOP_MAJOR_VERSION = "2"
val HADOOP_YARN = true
......@@ -156,7 +156,8 @@ object SparkBuild extends Build {
Seq(
"org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION,
"org.apache.hadoop" % "hadoop-yarn-api" % HADOOP_VERSION,
"org.apache.hadoop" % "hadoop-yarn-common" % HADOOP_VERSION
"org.apache.hadoop" % "hadoop-yarn-common" % HADOOP_VERSION,
"org.apache.hadoop" % "hadoop-yarn-client" % HADOOP_VERSION
)
} else {
Seq(
......
......@@ -201,6 +201,11 @@
<artifactId>hadoop-yarn-common</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
<profile>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment