Skip to content
Snippets Groups Projects
Commit 69c3bbf6 authored by Jey Kottalam's avatar Jey Kottalam
Browse files

dynamically detect hadoop version

parent f67b94ad
No related branches found
No related tags found
No related merge requests found
......@@ -18,10 +18,28 @@
package org.apache.hadoop.mapred
trait HadoopMapRedUtil {
def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContext(conf, jobId)
def newJobContext(conf: JobConf, jobId: JobID): JobContext = {
val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl", "org.apache.hadoop.mapred.JobContext");
val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[org.apache.hadoop.mapreduce.JobID])
ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
}
def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId)
def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = {
val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl", "org.apache.hadoop.mapred.TaskAttemptContext")
val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID])
ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
}
def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier,
jobId, isMap, taskId, attemptId)
def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = {
new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId)
}
private def firstAvailableClass(first: String, second: String): Class[_] = {
try {
Class.forName(first)
} catch {
case e: ClassNotFoundException =>
Class.forName(second)
}
}
}
......@@ -20,10 +20,32 @@ package org.apache.hadoop.mapreduce
import org.apache.hadoop.conf.Configuration
trait HadoopMapReduceUtil {
def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContext(conf, jobId)
def newJobContext(conf: Configuration, jobId: JobID): JobContext = {
val klass = firstAvailableClass(
"org.apache.hadoop.mapreduce.task.JobContextImpl",
"org.apache.hadoop.mapreduce.JobContext")
val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[JobID])
ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
}
def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId)
def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = {
val klass = firstAvailableClass(
"org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl",
"org.apache.hadoop.mapreduce.TaskAttemptContext")
val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[TaskAttemptID])
ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
}
def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier,
jobId, isMap, taskId, attemptId)
def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = {
new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId)
}
private def firstAvailableClass(first: String, second: String): Class[_] = {
try {
Class.forName(first)
} catch {
case e: ClassNotFoundException =>
Class.forName(second)
}
}
}
......@@ -27,13 +27,8 @@ object SparkBuild extends Build {
// Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or
// "1.0.4" for Apache releases, or "0.20.2-cdh3u5" for Cloudera Hadoop.
val HADOOP_VERSION = "1.0.4"
val HADOOP_MAJOR_VERSION = "1"
val HADOOP_YARN = false
// For Hadoop 2 versions such as "2.0.0-mr1-cdh4.1.1", set the HADOOP_MAJOR_VERSION to "2"
//val HADOOP_VERSION = "2.0.0-mr1-cdh4.1.1"
//val HADOOP_MAJOR_VERSION = "2"
//val HADOOP_YARN = false
val HADOOP_YARN = false
// For Hadoop 2 YARN support
//val HADOOP_VERSION = "2.0.2-alpha"
......@@ -184,37 +179,13 @@ object SparkBuild extends Build {
"org.apache.mesos" % "mesos" % "0.12.1",
"io.netty" % "netty-all" % "4.0.0.Beta2",
"org.apache.derby" % "derby" % "10.4.2.0" % "test",
"org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION,
"com.codahale.metrics" % "metrics-core" % "3.0.0",
"com.codahale.metrics" % "metrics-jvm" % "3.0.0",
"com.codahale.metrics" % "metrics-json" % "3.0.0",
"com.twitter" % "chill_2.9.3" % "0.3.1",
"com.twitter" % "chill-java" % "0.3.1"
) ++ (
if (HADOOP_MAJOR_VERSION == "2") {
if (HADOOP_YARN) {
Seq(
// Exclude rule required for all ?
"org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm),
"org.apache.hadoop" % "hadoop-yarn-api" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm),
"org.apache.hadoop" % "hadoop-yarn-common" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm),
"org.apache.hadoop" % "hadoop-yarn-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm)
)
} else {
Seq(
"org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm),
"org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm)
)
}
} else {
Seq("org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty) )
}),
unmanagedSourceDirectories in Compile <+= baseDirectory{ _ /
( if (HADOOP_YARN && HADOOP_MAJOR_VERSION == "2") {
"src/hadoop2-yarn/scala"
} else {
"src/hadoop" + HADOOP_MAJOR_VERSION + "/scala"
} )
}
)
) ++ assemblySettings ++ extraAssemblySettings
def rootSettings = sharedSettings ++ Seq(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment