Skip to content
Snippets Groups Projects
Commit e2ae7bd0 authored by Josh Rosen's avatar Josh Rosen Committed by Reynold Xin
Browse files

[SPARK-12819] Deprecate TaskContext.isRunningLocally()

We've already removed local execution but didn't deprecate `TaskContext.isRunningLocally()`; we should deprecate it for 2.0.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #10751 from JoshRosen/remove-local-exec-from-taskcontext.
parent 20d8ef85
No related branches found
No related tags found
No related merge requests found
......@@ -68,11 +68,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
logInfo(s"Partition $key not found, computing it")
val computedValues = rdd.computeOrReadCheckpoint(partition, context)
// If the task is running locally, do not persist the result
if (context.isRunningLocally) {
return computedValues
}
// Otherwise, cache the values and keep track of any updates in block statuses
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
......
......@@ -97,8 +97,9 @@ abstract class TaskContext extends Serializable {
/**
* Returns true if the task is running locally in the driver program.
* @return
* @return false
*/
@deprecated("Local execution was removed, so this always returns false", "2.0.0")
def isRunningLocally(): Boolean
/**
......
......@@ -33,7 +33,6 @@ private[spark] class TaskContextImpl(
override val taskMemoryManager: TaskMemoryManager,
@transient private val metricsSystem: MetricsSystem,
internalAccumulators: Seq[Accumulator[Long]],
val runningLocally: Boolean = false,
val taskMetrics: TaskMetrics = TaskMetrics.empty)
extends TaskContext
with Logging {
......@@ -85,7 +84,7 @@ private[spark] class TaskContextImpl(
override def isCompleted(): Boolean = completed
override def isRunningLocally(): Boolean = runningLocally
override def isRunningLocally(): Boolean = false
override def isInterrupted(): Boolean = interrupted
......
......@@ -74,8 +74,7 @@ private[spark] abstract class Task[T](
attemptNumber,
taskMemoryManager,
metricsSystem,
internalAccumulators,
runningLocally = false)
internalAccumulators)
TaskContext.setTaskContext(context)
context.taskMetrics.setHostname(Utils.localHostName())
context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators)
......
......@@ -82,15 +82,6 @@ class CacheManagerSuite extends SparkFunSuite with LocalSparkContext with Before
assert(value.toList === List(5, 6, 7))
}
test("get uncached local rdd") {
// Local computation should not persist the resulting value, so don't expect a put().
when(blockManager.get(RDDBlockId(0, 0))).thenReturn(None)
val context = new TaskContextImpl(0, 0, 0, 0, null, null, Seq.empty, runningLocally = true)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(1, 2, 3, 4))
}
test("verify task metrics updated correctly") {
cacheManager = sc.env.cacheManager
val context = TaskContext.empty()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment