Skip to content
Snippets Groups Projects
Commit 8d2a36c0 authored by Kay Ousterhout's avatar Kay Ousterhout Committed by Reynold Xin
Browse files

[SPARK-6754] Remove unnecessary TaskContextHelper

The TaskContextHelper was originally necessary because TaskContext was written in Java, which does
not have a way to specify that classes are package-private, so TaskContextHelper existed to work
around this. Now that TaskContext has been re-written in Scala, this class is no longer necessary.

rxin can you look at this? It looks like you missed this bit of cleanup when you moved TaskContext from Java to Scala in #4324

cc ScrapCodes and pwendell who added this originally.

Author: Kay Ousterhout <kayousterhout@gmail.com>

Closes #5402 from kayousterhout/SPARK-6754 and squashes the following commits:

f089800 [Kay Ousterhout] [SPARK-6754] Remove unnecessary TaskContextHelper
parent d138aa8e
No related branches found
No related tags found
No related merge requests found
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
/**
* This class exists to restrict the visibility of TaskContext setters.
*/
private [spark] object TaskContextHelper {
def setTaskContext(tc: TaskContext): Unit = TaskContext.setTaskContext(tc)
def unset(): Unit = TaskContext.unset()
}
......@@ -645,13 +645,13 @@ class DAGScheduler(
val split = rdd.partitions(job.partitions(0))
val taskContext = new TaskContextImpl(job.finalStage.id, job.partitions(0), taskAttemptId = 0,
attemptNumber = 0, runningLocally = true)
TaskContextHelper.setTaskContext(taskContext)
TaskContext.setTaskContext(taskContext)
try {
val result = job.func(taskContext, rdd.iterator(split, taskContext))
job.listener.taskSucceeded(0, result)
} finally {
taskContext.markTaskCompleted()
TaskContextHelper.unset()
TaskContext.unset()
}
} catch {
case e: Exception =>
......
......@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import scala.collection.mutable.HashMap
import org.apache.spark.{TaskContextHelper, TaskContextImpl, TaskContext}
import org.apache.spark.{TaskContextImpl, TaskContext}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.ByteBufferInputStream
......@@ -54,7 +54,7 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
final def run(taskAttemptId: Long, attemptNumber: Int): T = {
context = new TaskContextImpl(stageId = stageId, partitionId = partitionId,
taskAttemptId = taskAttemptId, attemptNumber = attemptNumber, runningLocally = false)
TaskContextHelper.setTaskContext(context)
TaskContext.setTaskContext(context)
context.taskMetrics.setHostname(Utils.localHostName())
taskThread = Thread.currentThread()
if (_killed) {
......@@ -64,7 +64,7 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
runTask(context)
} finally {
context.markTaskCompleted()
TaskContextHelper.unset()
TaskContext.unset()
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment