From c51c7725944d60738e2bac3e11f6aea74812905c Mon Sep 17 00:00:00 2001
From: Josh Rosen <joshrosen@databricks.com>
Date: Wed, 30 Nov 2016 14:47:41 -0500
Subject: [PATCH] [SPARK-18640] Add synchronization to
 TaskScheduler.runningTasksByExecutors

## What changes were proposed in this pull request?

The method `TaskSchedulerImpl.runningTasksByExecutors()` accesses the mutable `executorIdToRunningTaskIds` map without proper synchronization. In addition, as markhamstra pointed out in #15986, the signature's use of parentheses is a little odd given that this is a pure getter method.

This patch fixes both issues.

## How was this patch tested?

Covered by existing tests.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #16073 from JoshRosen/runningTasksByExecutors-thread-safety.
---
 core/src/main/scala/org/apache/spark/SparkStatusTracker.scala | 2 +-
 .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala  | 2 +-
 .../org/apache/spark/scheduler/TaskSchedulerImplSuite.scala   | 4 ++--
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
index 52c4656c27..22a553e684 100644
--- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
+++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
@@ -112,7 +112,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
    */
   def getExecutorInfos: Array[SparkExecutorInfo] = {
     val executorIdToRunningTasks: Map[String, Int] =
-      sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors()
+      sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors
 
     sc.getExecutorStorageStatus.map { status =>
       val bmId = status.blockManagerId
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 67446da0a8..b03cfe4f0d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -96,7 +96,7 @@ private[spark] class TaskSchedulerImpl(
   // IDs of the tasks running on each executor
   private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]]
 
-  def runningTasksByExecutors(): Map[String, Int] = {
+  def runningTasksByExecutors: Map[String, Int] = synchronized {
     executorIdToRunningTaskIds.toMap.mapValues(_.size)
   }
 
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 59bea27596..a0b6268331 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -678,7 +678,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     // Check that state associated with the lost task attempt is cleaned up:
     assert(taskScheduler.taskIdToExecutorId.isEmpty)
     assert(taskScheduler.taskIdToTaskSetManager.isEmpty)
-    assert(taskScheduler.runningTasksByExecutors().get("executor0").isEmpty)
+    assert(taskScheduler.runningTasksByExecutors.get("executor0").isEmpty)
   }
 
   test("if a task finishes with TaskState.LOST its executor is marked as dead") {
@@ -709,7 +709,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     // Check that state associated with the lost task attempt is cleaned up:
     assert(taskScheduler.taskIdToExecutorId.isEmpty)
     assert(taskScheduler.taskIdToTaskSetManager.isEmpty)
-    assert(taskScheduler.runningTasksByExecutors().get("executor0").isEmpty)
+    assert(taskScheduler.runningTasksByExecutors.get("executor0").isEmpty)
 
     // Check that the executor has been marked as dead
     assert(!taskScheduler.isExecutorAlive("executor0"))
-- 
GitLab