Skip to content
Snippets Groups Projects
Commit f6b6f883 authored by Henry Saputra's avatar Henry Saputra
Browse files

Set boolean param name for two files call to SparkHadoopMapReduceUtil.newTaskAttemptID to make

it clear which param being set.
parent c0f0155e
No related branches found
No related tags found
No related merge requests found
...@@ -76,7 +76,7 @@ class NewHadoopRDD[K, V]( ...@@ -76,7 +76,7 @@ class NewHadoopRDD[K, V](
val split = theSplit.asInstanceOf[NewHadoopPartition] val split = theSplit.asInstanceOf[NewHadoopPartition]
logInfo("Input split: " + split.serializableHadoopSplit) logInfo("Input split: " + split.serializableHadoopSplit)
val conf = confBroadcast.value.value val conf = confBroadcast.value.value
val attemptId = newTaskAttemptID(jobtrackerId, id, true, split.index, 0) val attemptId = newTaskAttemptID(jobtrackerId, id, isMap = true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
val format = inputFormatClass.newInstance val format = inputFormatClass.newInstance
if (format.isInstanceOf[Configurable]) { if (format.isInstanceOf[Configurable]) {
......
...@@ -613,7 +613,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) ...@@ -613,7 +613,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
// around by taking a mod. We expect that no task will be attempted 2 billion times. // around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt val attemptNumber = (context.attemptId % Int.MaxValue).toInt
/* "reduce task" <split #> <attempt # = spark task #> */ /* "reduce task" <split #> <attempt # = spark task #> */
val attemptId = newTaskAttemptID(jobtrackerID, stageId, false, context.partitionId, attemptNumber) val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId, attemptNumber)
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId) val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
val format = outputFormatClass.newInstance val format = outputFormatClass.newInstance
val committer = format.getOutputCommitter(hadoopContext) val committer = format.getOutputCommitter(hadoopContext)
...@@ -632,7 +632,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) ...@@ -632,7 +632,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* however we're only going to use this local OutputCommitter for * however we're only going to use this local OutputCommitter for
* setupJob/commitJob, so we just use a dummy "map" task. * setupJob/commitJob, so we just use a dummy "map" task.
*/ */
val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, true, 0, 0) val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId) val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
jobCommitter.setupJob(jobTaskContext) jobCommitter.setupJob(jobTaskContext)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment