From b929537b6eb0f8f34497c3dbceea8045bf5dffdb Mon Sep 17 00:00:00 2001
From: Josh Rosen <joshrosen@databricks.com>
Date: Tue, 1 Nov 2016 16:49:41 -0700
Subject: [PATCH] [SPARK-18182] Expose ReplayListenerBus.read() overload which
 takes string iterator

The `ReplayListenerBus.read()` method is used when implementing a custom `ApplicationHistoryProvider`. The current interface only exposes a `read()` method which takes an `InputStream` and performs stream-to-lines conversion itself, but it would also be useful to expose an overloaded method which accepts an iterator of strings, thereby enabling events to be provided from non-`InputStream` sources.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #15698 from JoshRosen/replay-listener-bus-interface.
---
 .../spark/scheduler/ReplayListenerBus.scala       | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index 2424586431..0bd5a6bc59 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -53,13 +53,24 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
       sourceName: String,
       maybeTruncated: Boolean = false,
       eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = {
+    val lines = Source.fromInputStream(logData).getLines()
+    replay(lines, sourceName, maybeTruncated, eventsFilter)
+  }
 
+  /**
+   * Overloaded variant of [[replay()]] which accepts an iterator of lines instead of an
+   * [[InputStream]]. Exposed for use by custom ApplicationHistoryProvider implementations.
+   */
+  def replay(
+      lines: Iterator[String],
+      sourceName: String,
+      maybeTruncated: Boolean,
+      eventsFilter: ReplayEventsFilter): Unit = {
     var currentLine: String = null
     var lineNumber: Int = 0
 
     try {
-      val lineEntries = Source.fromInputStream(logData)
-        .getLines()
+      val lineEntries = lines
         .zipWithIndex
         .filter { case (line, _) => eventsFilter(line) }
 
-- 
GitLab