From 692c74840bc53debbb842db5372702f58207412c Mon Sep 17 00:00:00 2001
From: Michael Armbrust <michael@databricks.com>
Date: Thu, 7 Apr 2016 18:05:54 -0700
Subject: [PATCH] [SPARK-14449][SQL] SparkContext should use
 SparkListenerInterface

Currently all `SparkFirehoseListener` implementations are broken since we expect listeners to extend `SparkListener`, while the fire hose only extends `SparkListenerInterface`.  This changes the addListener function and the config based injection to use the interface instead.

The existing tests in SparkListenerSuite are improved such that they would have caught this.

Follow-up to #12142

Author: Michael Armbrust <michael@databricks.com>

Closes #12227 from marmbrus/fixListener.
---
 .../scala/org/apache/spark/SparkContext.scala |  8 +++++---
 .../spark/scheduler/SparkListenerBus.scala    |  7 +++++--
 .../spark/scheduler/SparkListenerSuite.scala  | 19 ++++++++++++++++---
 project/MimaExcludes.scala                    |  1 +
 4 files changed, 27 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index c40fada64b..9ec5cedf25 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1356,7 +1356,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * Register a listener to receive up-calls from events that happen during execution.
    */
   @DeveloperApi
-  def addSparkListener(listener: SparkListener) {
+  def addSparkListener(listener: SparkListenerInterface) {
     listenerBus.addListener(listener)
   }
 
@@ -2007,7 +2007,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
         // Use reflection to find the right constructor
         val constructors = {
           val listenerClass = Utils.classForName(className)
-          listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: SparkListener]]]
+          listenerClass
+              .getConstructors
+              .asInstanceOf[Array[Constructor[_ <: SparkListenerInterface]]]
         }
         val constructorTakingSparkConf = constructors.find { c =>
           c.getParameterTypes.sameElements(Array(classOf[SparkConf]))
@@ -2015,7 +2017,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
         lazy val zeroArgumentConstructor = constructors.find { c =>
           c.getParameterTypes.isEmpty
         }
-        val listener: SparkListener = {
+        val listener: SparkListenerInterface = {
           if (constructorTakingSparkConf.isDefined) {
             constructorTakingSparkConf.get.newInstance(conf)
           } else if (zeroArgumentConstructor.isDefined) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 94f0574f0e..471586ac08 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -22,9 +22,12 @@ import org.apache.spark.util.ListenerBus
 /**
  * A [[SparkListenerEvent]] bus that relays [[SparkListenerEvent]]s to its listeners
  */
-private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkListenerEvent] {
+private[spark] trait SparkListenerBus
+  extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {
 
-  protected override def doPostEvent(listener: SparkListener, event: SparkListenerEvent): Unit = {
+  protected override def doPostEvent(
+      listener: SparkListenerInterface,
+      event: SparkListenerEvent): Unit = {
     event match {
       case stageSubmitted: SparkListenerStageSubmitted =>
         listener.onStageSubmitted(stageSubmitted)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 58d217ffef..b854d742b5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
 
 import org.scalatest.Matchers
 
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
+import org.apache.spark._
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
 
@@ -377,13 +377,18 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
   }
 
   test("registering listeners via spark.extraListeners") {
+    val listeners = Seq(
+      classOf[ListenerThatAcceptsSparkConf],
+      classOf[FirehoseListenerThatAcceptsSparkConf],
+      classOf[BasicJobCounter])
     val conf = new SparkConf().setMaster("local").setAppName("test")
-      .set("spark.extraListeners", classOf[ListenerThatAcceptsSparkConf].getName + "," +
-        classOf[BasicJobCounter].getName)
+      .set("spark.extraListeners", listeners.map(_.getName).mkString(","))
     sc = new SparkContext(conf)
     sc.listenerBus.listeners.asScala.count(_.isInstanceOf[BasicJobCounter]) should be (1)
     sc.listenerBus.listeners.asScala
       .count(_.isInstanceOf[ListenerThatAcceptsSparkConf]) should be (1)
+    sc.listenerBus.listeners.asScala
+        .count(_.isInstanceOf[FirehoseListenerThatAcceptsSparkConf]) should be (1)
   }
 
   /**
@@ -476,3 +481,11 @@ private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListene
   var count = 0
   override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1
 }
+
+private class FirehoseListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkFirehoseListener {
+  var count = 0
+  override def onEvent(event: SparkListenerEvent): Unit = event match {
+    case job: SparkListenerJobEnd => count += 1
+    case _ =>
+  }
+}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index d916c49a6a..fbadc563b8 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -68,6 +68,7 @@ object MimaExcludes {
         ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.SparkContext.emptyRDD"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory"),
         // SPARK-14358 SparkListener from trait to abstract class
+        ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.SparkContext.addSparkListener"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.JavaSparkListener"),
         ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.SparkFirehoseListener"),
         ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.scheduler.SparkListener"),
-- 
GitLab