Skip to content
Snippets Groups Projects
  1. Jul 01, 2017
    • Devaraj K's avatar
      [SPARK-21170][CORE] Utils.tryWithSafeFinallyAndFailureCallbacks throws... · 6beca9ce
      Devaraj K authored
      [SPARK-21170][CORE] Utils.tryWithSafeFinallyAndFailureCallbacks throws IllegalArgumentException: Self-suppression not permitted
      
      ## What changes were proposed in this pull request?
      
      Not adding the exception to the suppressed if it is the same instance as originalThrowable.
      
      ## How was this patch tested?
      
      Added new tests to verify this, these tests fail without source code changes and passes with the change.
      
      Author: Devaraj K <devaraj@apache.org>
      
      Closes #18384 from devaraj-kavali/SPARK-21170.
      6beca9ce
  2. Jun 30, 2017
    • Liang-Chi Hsieh's avatar
      [SPARK-21052][SQL][FOLLOW-UP] Add hash map metrics to join · fd132552
      Liang-Chi Hsieh authored
      ## What changes were proposed in this pull request?
      
      Remove `numHashCollisions` in `BytesToBytesMap`. And change `getAverageProbesPerLookup()` to `getAverageProbesPerLookup` as suggested.
      
      ## How was this patch tested?
      
      Existing tests.
      
      Author: Liang-Chi Hsieh <viirya@gmail.com>
      
      Closes #18480 from viirya/SPARK-21052-followup.
      fd132552
    • 曾林西's avatar
      [SPARK-21223] Change fileToAppInfo in FsHistoryProvider to fix concurrent issue. · 1fe08d62
      曾林西 authored
      # What issue does this PR address ?
      Jira:https://issues.apache.org/jira/browse/SPARK-21223
      fix the Thread-safety issue in FsHistoryProvider
      Currently, Spark HistoryServer use a HashMap named fileToAppInfo in class FsHistoryProvider to store the map of eventlog path and attemptInfo.
      When use ThreadPool to Replay the log files in the list and merge the list of old applications with new ones, multi thread may update fileToAppInfo at the same time, which may cause Thread-safety issues, such as  falling into an infinite loop because of calling resize func of the hashtable.
      
      Author: 曾林西 <zenglinxi@meituan.com>
      
      Closes #18430 from zenglinxi0615/master.
      1fe08d62
    • Xingbo Jiang's avatar
      [SPARK-18294][CORE] Implement commit protocol to support `mapred` package's committer · 3c2fc19d
      Xingbo Jiang authored
      ## What changes were proposed in this pull request?
      
      This PR makes the following changes:
      
      - Implement a new commit protocol `HadoopMapRedCommitProtocol` which support the old `mapred` package's committer;
      - Refactor SparkHadoopWriter and SparkHadoopMapReduceWriter, now they are combined together, thus we can support write through both mapred and mapreduce API by the new SparkHadoopWriter, a lot of duplicated codes are removed.
      
      After this change, it should be pretty easy for us to support the committer from both the new and the old hadoop API at high level.
      
      ## How was this patch tested?
      No major behavior change, passed the existing test cases.
      
      Author: Xingbo Jiang <xingbo.jiang@databricks.com>
      
      Closes #18438 from jiangxb1987/SparkHadoopWriter.
      3c2fc19d
  3. Jun 29, 2017
    • IngoSchuster's avatar
      [SPARK-21176][WEB UI] Limit number of selector threads for admin ui proxy servlets to 8 · 88a536ba
      IngoSchuster authored
      ## What changes were proposed in this pull request?
      Please see also https://issues.apache.org/jira/browse/SPARK-21176
      
      This change limits the number of selector threads that jetty creates to maximum 8 per proxy servlet (Jetty default is number of processors / 2).
      The newHttpClient for Jettys ProxyServlet class is overwritten to avoid the Jetty defaults (which are designed for high-performance http servers).
      Once https://github.com/eclipse/jetty.project/issues/1643 is available, the code could be cleaned up to avoid the method override.
      
      I really need this on v2.1.1 - what is the best way for a backport automatic merge works fine)? Shall I create another PR?
      
      ## How was this patch tested?
      (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
      The patch was tested manually on a Spark cluster with a head node that has 88 processors using JMX to verify that the number of selector threads is now limited to 8 per proxy.
      
      gurvindersingh zsxwing can you please review the change?
      
      Author: IngoSchuster <ingo.schuster@de.ibm.com>
      Author: Ingo Schuster <ingo.schuster@de.ibm.com>
      
      Closes #18437 from IngoSchuster/master.
      88a536ba
    • Shixiong Zhu's avatar
      [SPARK-21253][CORE] Disable spark.reducer.maxReqSizeShuffleToMem · 80f7ac3a
      Shixiong Zhu authored
      ## What changes were proposed in this pull request?
      
      Disable spark.reducer.maxReqSizeShuffleToMem because it breaks the old shuffle service.
      
      Credits to wangyum
      
      Closes #18466
      
      ## How was this patch tested?
      
      Jenkins
      
      Author: Shixiong Zhu <shixiong@databricks.com>
      Author: Yuming Wang <wgyumg@gmail.com>
      
      Closes #18467 from zsxwing/SPARK-21253.
      80f7ac3a
    • Feng Liu's avatar
      [SPARK-21188][CORE] releaseAllLocksForTask should synchronize the whole method · f9151beb
      Feng Liu authored
      ## What changes were proposed in this pull request?
      
      Since the objects `readLocksByTask`, `writeLocksByTask` and `info`s are coupled and supposed to be modified by other threads concurrently, all the read and writes of them in the method `releaseAllLocksForTask` should be protected by a single synchronized block like other similar methods.
      
      ## How was this patch tested?
      
      existing tests
      
      Author: Feng Liu <fengliu@databricks.com>
      
      Closes #18400 from liufengdb/synchronize.
      f9151beb
    • 杨治国10192065's avatar
      [SPARK-21225][CORE] Considering CPUS_PER_TASK when allocating task slots for each WorkerOffer · 29bd251d
      杨治国10192065 authored
      JIRA Issue:https://issues.apache.org/jira/browse/SPARK-21225
          In the function "resourceOffers", It declare a variable "tasks" for storage the tasks which have allocated a executor. It declared like this:
      `val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))`
          But, I think this code only conside a situation for that one task per core. If the user set "spark.task.cpus" as 2 or 3, It really don't need so much Mem. I think It can motify as follow:
      val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
       to instead.
          Motify like this the other earning is that it's more easy to understand the way how the tasks allocate offers.
      
      Author: 杨治国10192065 <yang.zhiguo@zte.com.cn>
      
      Closes #18435 from JackYangzg/motifyTaskCoreDisp.
      29bd251d
    • fjh100456's avatar
      [SPARK-21135][WEB UI] On history server page,duration of incompleted... · d7da2b94
      fjh100456 authored
      [SPARK-21135][WEB UI] On history server page,duration of incompleted applications should be hidden instead of showing up as 0
      
      ## What changes were proposed in this pull request?
      
      Hide duration of incompleted applications.
      
      ## How was this patch tested?
      
      manual tests
      
      Author: fjh100456 <fu.jinhua6@zte.com.cn>
      
      Closes #18351 from fjh100456/master.
      d7da2b94
    • jinxing's avatar
      [SPARK-21240] Fix code style for constructing and stopping a SparkContext in UT. · d106a74c
      jinxing authored
      ## What changes were proposed in this pull request?
      
      Same with SPARK-20985.
      Fix code style for constructing and stopping a `SparkContext`. Assure the context is stopped to avoid other tests complain that there's only one `SparkContext` can exist.
      
      Author: jinxing <jinxing6042@126.com>
      
      Closes #18454 from jinxing64/SPARK-21240.
      d106a74c
    • Sital Kedia's avatar
      [SPARK-3577] Report Spill size on disk for UnsafeExternalSorter · a946be35
      Sital Kedia authored
      ## What changes were proposed in this pull request?
      
      Report Spill size on disk for UnsafeExternalSorter
      
      ## How was this patch tested?
      
      Tested by running a job on cluster and verify the spill size on disk.
      
      Author: Sital Kedia <skedia@fb.com>
      
      Closes #17471 from sitalkedia/fix_disk_spill_size.
      a946be35
  4. Jun 27, 2017
    • Eric Vandenberg's avatar
      [SPARK-21155][WEBUI] Add (? running tasks) into Spark UI progress · 2d686a19
      Eric Vandenberg authored
      ## What changes were proposed in this pull request?
      
      Add metric on number of running tasks to status bar on Jobs / Active Jobs.
      
      ## How was this patch tested?
      
      Run a long running (1 minute) query in spark-shell and use localhost:4040 web UI to observe progress.  See jira for screen snapshot.
      
      Author: Eric Vandenberg <ericvandenberg@fb.com>
      
      Closes #18369 from ericvandenbergfb/runningTasks.
      2d686a19
  5. Jun 26, 2017
    • jerryshao's avatar
      [SPARK-13669][SPARK-20898][CORE] Improve the blacklist mechanism to handle... · 9e50a1d3
      jerryshao authored
      [SPARK-13669][SPARK-20898][CORE] Improve the blacklist mechanism to handle external shuffle service unavailable situation
      
      ## What changes were proposed in this pull request?
      
      Currently we are running into an issue with Yarn work preserving enabled + external shuffle service.
      In the work preserving enabled scenario, the failure of NM will not lead to the exit of executors, so executors can still accept and run the tasks. The problem here is when NM is failed, external shuffle service is actually inaccessible, so reduce tasks will always complain about the “Fetch failure”, and the failure of reduce stage will make the parent stage (map stage) rerun. The tricky thing here is Spark scheduler is not aware of the unavailability of external shuffle service, and will reschedule the map tasks on the executor where NM is failed, and again reduce stage will be failed with “Fetch failure”, and after 4 retries, the job is failed. This could also apply to other cluster manager with external shuffle service.
      
      So here the main problem is that we should avoid assigning tasks to those bad executors (where shuffle service is unavailable). Current Spark's blacklist mechanism could blacklist executors/nodes by failure tasks, but it doesn't handle this specific fetch failure scenario. So here propose to improve the current application blacklist mechanism to handle fetch failure issue (especially with external shuffle service unavailable issue), to blacklist the executors/nodes where shuffle fetch is unavailable.
      
      ## How was this patch tested?
      
      Unit test and small cluster verification.
      
      Author: jerryshao <sshao@hortonworks.com>
      
      Closes #17113 from jerryshao/SPARK-13669.
      9e50a1d3
  6. Jun 24, 2017
    • Marcelo Vanzin's avatar
      [SPARK-21159][CORE] Don't try to connect to launcher in standalone cluster mode. · bfd73a7c
      Marcelo Vanzin authored
      Monitoring for standalone cluster mode is not implemented (see SPARK-11033), but
      the same scheduler implementation is used, and if it tries to connect to the
      launcher it will fail. So fix the scheduler so it only tries that in client mode;
      cluster mode applications will be correctly launched and will work, but monitoring
      through the launcher handle will not be available.
      
      Tested by running a cluster mode app with "SparkLauncher.startApplication".
      
      Author: Marcelo Vanzin <vanzin@cloudera.com>
      
      Closes #18397 from vanzin/SPARK-21159.
      bfd73a7c
  7. Jun 23, 2017
    • 10129659's avatar
      [SPARK-21115][CORE] If the cores left is less than the coresPerExecutor,the... · acd208ee
      10129659 authored
      [SPARK-21115][CORE] If the cores left is less than the coresPerExecutor,the cores left will not be allocated, so it should not to check in every schedule
      
      ## What changes were proposed in this pull request?
      If we start an app with the param --total-executor-cores=4 and spark.executor.cores=3, the cores left is always 1, so it will try to allocate executors in the function org.apache.spark.deploy.master.startExecutorsOnWorkers in every schedule.
      Another question is, is it will be better to allocate another executor with 1 core for the cores left.
      
      ## How was this patch tested?
      unit test
      
      Author: 10129659 <chen.yanshan@zte.com.cn>
      
      Closes #18322 from eatoncys/leftcores.
      acd208ee
  8. Jun 22, 2017
    • Thomas Graves's avatar
      [SPARK-20923] turn tracking of TaskMetrics._updatedBlockStatuses off · 5b5a69be
      Thomas Graves authored
      ## What changes were proposed in this pull request?
      Turn tracking of TaskMetrics._updatedBlockStatuses off by default. As far as I can see its not used by anything and it uses a lot of memory when caching and processing a lot of blocks.  In my case it was taking 5GB of a 10GB heap and I even went up to 50GB heap and the job still ran out of memory.  With this change in place the same job easily runs in less then 10GB of heap.
      
      We leave the api there as well as a config to turn it back on just in case anyone is using it.  TaskMetrics is exposed via SparkListenerTaskEnd so if users are relying on it they can turn it back on.
      
      ## How was this patch tested?
      
      Ran unit tests that were modified and manually tested on a couple of jobs (with and without caching).  Clicked through the UI and didn't see anything missing.
      Ran my very large hive query job with 200,000 small tasks, 1000 executors, cached 6+TB of data this runs fine now whereas without this change it would go into full gcs and eventually die.
      
      Author: Thomas Graves <tgraves@thirteenroutine.corp.gq1.yahoo.com>
      Author: Tom Graves <tgraves@yahoo-inc.com>
      
      Closes #18162 from tgravescs/SPARK-20923.
      5b5a69be
    • jinxing's avatar
      [SPARK-19937] Collect metrics for remote bytes read to disk during shuffle. · 58434acd
      jinxing authored
      In current code(https://github.com/apache/spark/pull/16989), big blocks are shuffled to disk.
      This pr proposes to collect metrics for remote bytes fetched to disk.
      
      Author: jinxing <jinxing6042@126.com>
      
      Closes #18249 from jinxing64/SPARK-19937.
      58434acd
    • Xingbo Jiang's avatar
      [SPARK-20832][CORE] Standalone master should explicitly inform drivers of... · 2dadea95
      Xingbo Jiang authored
      [SPARK-20832][CORE] Standalone master should explicitly inform drivers of worker deaths and invalidate external shuffle service outputs
      
      ## What changes were proposed in this pull request?
      
      In standalone mode, master should explicitly inform each active driver of any worker deaths, so the invalid external shuffle service outputs on the lost host would be removed from the shuffle mapStatus, thus we can avoid future `FetchFailure`s.
      
      ## How was this patch tested?
      Manually tested by the following steps:
      1. Start a standalone Spark cluster with one driver node and two worker nodes;
      2. Run a Job with ShuffleMapStage, ensure the outputs distribute on each worker;
      3. Run another Job to make all executors exit, but the workers are all alive;
      4. Kill one of the workers;
      5. Run rdd.collect(), before this change, we should see `FetchFailure`s and failed Stages, while after the change, the job should complete without failure.
      
      Before the change:
      ![image](https://user-images.githubusercontent.com/4784782/27335366-c251c3d6-55fe-11e7-99dd-d1fdcb429210.png)
      
      After the change:
      ![image](https://user-images.githubusercontent.com/4784782/27335393-d1c71640-55fe-11e7-89ed-bd760f1f39af.png)
      
      Author: Xingbo Jiang <xingbo.jiang@databricks.com>
      
      Closes #18362 from jiangxb1987/removeWorker.
      2dadea95
  9. Jun 21, 2017
    • sjarvie's avatar
      [SPARK-21125][PYTHON] Extend setJobDescription to PySpark and JavaSpark APIs · ba78514d
      sjarvie authored
      ## What changes were proposed in this pull request?
      
      Extend setJobDescription to PySpark and JavaSpark APIs
      
      SPARK-21125
      
      ## How was this patch tested?
      
      Testing was done by running a local Spark shell on the built UI. I originally had added a unit test but the PySpark context cannot easily access the Scala Spark Context's private variable with the Job Description key so I omitted the test, due to the simplicity of this addition.
      
      Also ran the existing tests.
      
      # Misc
      
      This contribution is my original work and that I license the work to the project under the project's open source license.
      
      Author: sjarvie <sjarvie@uber.com>
      
      Closes #18332 from sjarvie/add_python_set_job_description.
      ba78514d
    • Li Yichao's avatar
      [SPARK-20640][CORE] Make rpc timeout and retry for shuffle registration configurable. · d107b3b9
      Li Yichao authored
      ## What changes were proposed in this pull request?
      
      Currently the shuffle service registration timeout and retry has been hardcoded. This works well for small workloads but under heavy workload when the shuffle service is busy transferring large amount of data we see significant delay in responding to the registration request, as a result we often see the executors fail to register with the shuffle service, eventually failing the job. We need to make these two parameters configurable.
      
      ## How was this patch tested?
      
      * Updated `BlockManagerSuite` to test registration timeout and max attempts configuration actually works.
      
      cc sitalkedia
      
      Author: Li Yichao <lyc@zhihu.com>
      
      Closes #18092 from liyichao/SPARK-20640.
      d107b3b9
  10. Jun 20, 2017
    • Xingbo Jiang's avatar
      [SPARK-20989][CORE] Fail to start multiple workers on one host if external... · ef162289
      Xingbo Jiang authored
      [SPARK-20989][CORE] Fail to start multiple workers on one host if external shuffle service is enabled in standalone mode
      
      ## What changes were proposed in this pull request?
      
      In standalone mode, if we enable external shuffle service by setting `spark.shuffle.service.enabled` to true, and then we try to start multiple workers on one host(by setting `SPARK_WORKER_INSTANCES=3` in spark-env.sh, and then run `sbin/start-slaves.sh`), we can only launch one worker on each host successfully and the rest of the workers fail to launch.
      The reason is the port of external shuffle service if configed by `spark.shuffle.service.port`, so currently we could start no more than one external shuffle service on each host. In our case, each worker tries to start a external shuffle service, and only one of them succeeded doing this.
      
      We should give explicit reason of failure instead of fail silently.
      
      ## How was this patch tested?
      Manually test by the following steps:
      1. SET `SPARK_WORKER_INSTANCES=1` in `conf/spark-env.sh`;
      2. SET `spark.shuffle.service.enabled` to `true` in `conf/spark-defaults.conf`;
      3. Run `sbin/start-all.sh`.
      
      Before the change, you will see no error in the command line, as the following:
      ```
      starting org.apache.spark.deploy.master.Master, logging to /Users/xxx/workspace/spark/logs/spark-xxx-org.apache.spark.deploy.master.Master-1-xxx.local.out
      localhost: starting org.apache.spark.deploy.worker.Worker, logging to /Users/xxx/workspace/spark/logs/spark-xxx-org.apache.spark.deploy.worker.Worker-1-xxx.local.out
      localhost: starting org.apache.spark.deploy.worker.Worker, logging to /Users/xxx/workspace/spark/logs/spark-xxx-org.apache.spark.deploy.worker.Worker-2-xxx.local.out
      localhost: starting org.apache.spark.deploy.worker.Worker, logging to /Users/xxx/workspace/spark/logs/spark-xxx-org.apache.spark.deploy.worker.Worker-3-xxx.local.out
      ```
      And you can see in the webUI that only one worker is running.
      
      After the change, you get explicit error messages in the command line:
      ```
      starting org.apache.spark.deploy.master.Master, logging to /Users/xxx/workspace/spark/logs/spark-xxx-org.apache.spark.deploy.master.Master-1-xxx.local.out
      localhost: starting org.apache.spark.deploy.worker.Worker, logging to /Users/xxx/workspace/spark/logs/spark-xxx-org.apache.spark.deploy.worker.Worker-1-xxx.local.out
      localhost: failed to launch: nice -n 0 /Users/xxx/workspace/spark/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://xxx.local:7077
      localhost:   17/06/13 23:24:53 INFO SecurityManager: Changing view acls to: xxx
      localhost:   17/06/13 23:24:53 INFO SecurityManager: Changing modify acls to: xxx
      localhost:   17/06/13 23:24:53 INFO SecurityManager: Changing view acls groups to:
      localhost:   17/06/13 23:24:53 INFO SecurityManager: Changing modify acls groups to:
      localhost:   17/06/13 23:24:53 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(xxx); groups with view permissions: Set(); users  with modify permissions: Set(xxx); groups with modify permissions: Set()
      localhost:   17/06/13 23:24:54 INFO Utils: Successfully started service 'sparkWorker' on port 63354.
      localhost:   Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Start multiple worker on one host failed because we may launch no more than one external shuffle service on each host, please set spark.shuffle.service.enabled to false or set SPARK_WORKER_INSTANCES to 1 to resolve the conflict.
      localhost:   	at scala.Predef$.require(Predef.scala:224)
      localhost:   	at org.apache.spark.deploy.worker.Worker$.main(Worker.scala:752)
      localhost:   	at org.apache.spark.deploy.worker.Worker.main(Worker.scala)
      localhost: full log in /Users/xxx/workspace/spark/logs/spark-xxx-org.apache.spark.deploy.worker.Worker-1-xxx.local.out
      localhost: starting org.apache.spark.deploy.worker.Worker, logging to /Users/xxx/workspace/spark/logs/spark-xxx-org.apache.spark.deploy.worker.Worker-2-xxx.local.out
      localhost: failed to launch: nice -n 0 /Users/xxx/workspace/spark/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8082 spark://xxx.local:7077
      localhost:   17/06/13 23:24:56 INFO SecurityManager: Changing view acls to: xxx
      localhost:   17/06/13 23:24:56 INFO SecurityManager: Changing modify acls to: xxx
      localhost:   17/06/13 23:24:56 INFO SecurityManager: Changing view acls groups to:
      localhost:   17/06/13 23:24:56 INFO SecurityManager: Changing modify acls groups to:
      localhost:   17/06/13 23:24:56 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(xxx); groups with view permissions: Set(); users  with modify permissions: Set(xxx); groups with modify permissions: Set()
      localhost:   17/06/13 23:24:56 INFO Utils: Successfully started service 'sparkWorker' on port 63359.
      localhost:   Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Start multiple worker on one host failed because we may launch no more than one external shuffle service on each host, please set spark.shuffle.service.enabled to false or set SPARK_WORKER_INSTANCES to 1 to resolve the conflict.
      localhost:   	at scala.Predef$.require(Predef.scala:224)
      localhost:   	at org.apache.spark.deploy.worker.Worker$.main(Worker.scala:752)
      localhost:   	at org.apache.spark.deploy.worker.Worker.main(Worker.scala)
      localhost: full log in /Users/xxx/workspace/spark/logs/spark-xxx-org.apache.spark.deploy.worker.Worker-2-xxx.local.out
      localhost: starting org.apache.spark.deploy.worker.Worker, logging to /Users/xxx/workspace/spark/logs/spark-xxx-org.apache.spark.deploy.worker.Worker-3-xxx.local.out
      localhost: failed to launch: nice -n 0 /Users/xxx/workspace/spark/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8083 spark://xxx.local:7077
      localhost:   17/06/13 23:24:59 INFO SecurityManager: Changing view acls to: xxx
      localhost:   17/06/13 23:24:59 INFO SecurityManager: Changing modify acls to: xxx
      localhost:   17/06/13 23:24:59 INFO SecurityManager: Changing view acls groups to:
      localhost:   17/06/13 23:24:59 INFO SecurityManager: Changing modify acls groups to:
      localhost:   17/06/13 23:24:59 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(xxx); groups with view permissions: Set(); users  with modify permissions: Set(xxx); groups with modify permissions: Set()
      localhost:   17/06/13 23:24:59 INFO Utils: Successfully started service 'sparkWorker' on port 63360.
      localhost:   Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Start multiple worker on one host failed because we may launch no more than one external shuffle service on each host, please set spark.shuffle.service.enabled to false or set SPARK_WORKER_INSTANCES to 1 to resolve the conflict.
      localhost:   	at scala.Predef$.require(Predef.scala:224)
      localhost:   	at org.apache.spark.deploy.worker.Worker$.main(Worker.scala:752)
      localhost:   	at org.apache.spark.deploy.worker.Worker.main(Worker.scala)
      localhost: full log in /Users/xxx/workspace/spark/logs/spark-xxx-org.apache.spark.deploy.worker.Worker-3-xxx.local.out
      ```
      
      Author: Xingbo Jiang <xingbo.jiang@databricks.com>
      
      Closes #18290 from jiangxb1987/start-slave.
      ef162289
  11. Jun 19, 2017
    • Yuming Wang's avatar
      [SPARK-21133][CORE] Fix HighlyCompressedMapStatus#writeExternal throws NPE · 9b57cd8d
      Yuming Wang authored
      ## What changes were proposed in this pull request?
      
      Fix HighlyCompressedMapStatus#writeExternal NPE:
      ```
      17/06/18 15:00:27 ERROR Utils: Exception encountered
      java.lang.NullPointerException
              at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply$mcV$sp(MapStatus.scala:171)
              at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167)
              at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167)
              at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303)
              at org.apache.spark.scheduler.HighlyCompressedMapStatus.writeExternal(MapStatus.scala:167)
              at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
              at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
              at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
              at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
              at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
              at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
              at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:617)
              at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616)
              at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616)
              at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
              at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:619)
              at org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:562)
              at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:351)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
              at java.lang.Thread.run(Thread.java:745)
      17/06/18 15:00:27 ERROR MapOutputTrackerMaster: java.lang.NullPointerException
      java.io.IOException: java.lang.NullPointerException
              at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1310)
              at org.apache.spark.scheduler.HighlyCompressedMapStatus.writeExternal(MapStatus.scala:167)
              at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
              at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
              at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
              at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
              at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
              at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
              at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:617)
              at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616)
              at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616)
              at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
              at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:619)
              at org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:562)
              at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:351)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
              at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.NullPointerException
              at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply$mcV$sp(MapStatus.scala:171)
              at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167)
              at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167)
              at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303)
              ... 17 more
      17/06/18 15:00:27 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.17.47.20:50188
      17/06/18 15:00:27 ERROR Utils: Exception encountered
      java.lang.NullPointerException
              at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply$mcV$sp(MapStatus.scala:171)
              at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167)
              at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167)
              at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303)
              at org.apache.spark.scheduler.HighlyCompressedMapStatus.writeExternal(MapStatus.scala:167)
              at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
              at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
              at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
              at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
              at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
              at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
              at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:617)
              at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616)
              at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616)
              at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
              at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:619)
              at org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:562)
              at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:351)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
              at java.lang.Thread.run(Thread.java:745)
      ```
      
      ## How was this patch tested?
      
      manual tests
      
      Author: Yuming Wang <wgyumg@gmail.com>
      
      Closes #18343 from wangyum/SPARK-21133.
      9b57cd8d
    • Marcelo Vanzin's avatar
      [SPARK-21124][UI] Show correct application user in UI. · 581565dd
      Marcelo Vanzin authored
      The jobs page currently shows the application user, but it assumes
      the OS user is the same as the user running the application, which
      may not be true in all scenarios (e.g., kerberos). While it might be
      useful to show both in the UI, this change just chooses the application
      user over the OS user, since the latter can be found in the environment
      page if needed.
      
      Tested in live application and in history server.
      
      Author: Marcelo Vanzin <vanzin@cloudera.com>
      
      Closes #18331 from vanzin/SPARK-21124.
      581565dd
    • Dongjoon Hyun's avatar
      [MINOR][BUILD] Fix Java linter errors · ecc56313
      Dongjoon Hyun authored
      ## What changes were proposed in this pull request?
      
      This PR cleans up a few Java linter errors for Apache Spark 2.2 release.
      
      ## How was this patch tested?
      
      ```bash
      $ dev/lint-java
      Using `mvn` from path: /usr/local/bin/mvn
      Checkstyle checks passed.
      ```
      
      We can check the result at Travis CI, [here](https://travis-ci.org/dongjoon-hyun/spark/builds/244297894).
      
      Author: Dongjoon Hyun <dongjoon@apache.org>
      
      Closes #18345 from dongjoon-hyun/fix_lint_java_2.
      ecc56313
    • Xingbo Jiang's avatar
      [SPARK-19824][CORE] Update JsonProtocol to keep consistent with the UI · ea542d29
      Xingbo Jiang authored
      ## What changes were proposed in this pull request?
      
      Fix any inconsistent part in JsonProtocol with the UI.
      This PR also contains the modifications in #17181
      
      ## How was this patch tested?
      
      Updated JsonProtocolSuite.
      
      Before this change, localhost:8080/json shows:
      ```
      {
        "url" : "spark://xingbos-MBP.local:7077",
        "workers" : [ {
          "id" : "worker-20170615172946-192.168.0.101-49450",
          "host" : "192.168.0.101",
          "port" : 49450,
          "webuiaddress" : "http://192.168.0.101:8081",
          "cores" : 8,
          "coresused" : 8,
          "coresfree" : 0,
          "memory" : 15360,
          "memoryused" : 1024,
          "memoryfree" : 14336,
          "state" : "ALIVE",
          "lastheartbeat" : 1497519481722
        }, {
          "id" : "worker-20170615172948-192.168.0.101-49452",
          "host" : "192.168.0.101",
          "port" : 49452,
          "webuiaddress" : "http://192.168.0.101:8082",
          "cores" : 8,
          "coresused" : 8,
          "coresfree" : 0,
          "memory" : 15360,
          "memoryused" : 1024,
          "memoryfree" : 14336,
          "state" : "ALIVE",
          "lastheartbeat" : 1497519484160
        }, {
          "id" : "worker-20170615172951-192.168.0.101-49469",
          "host" : "192.168.0.101",
          "port" : 49469,
          "webuiaddress" : "http://192.168.0.101:8083",
          "cores" : 8,
          "coresused" : 8,
          "coresfree" : 0,
          "memory" : 15360,
          "memoryused" : 1024,
          "memoryfree" : 14336,
          "state" : "ALIVE",
          "lastheartbeat" : 1497519486905
        } ],
        "cores" : 24,
        "coresused" : 24,
        "memory" : 46080,
        "memoryused" : 3072,
        "activeapps" : [ {
          "starttime" : 1497519426990,
          "id" : "app-20170615173706-0001",
          "name" : "Spark shell",
          "user" : "xingbojiang",
          "memoryperslave" : 1024,
          "submitdate" : "Thu Jun 15 17:37:06 CST 2017",
          "state" : "RUNNING",
          "duration" : 65362
        } ],
        "completedapps" : [ {
          "starttime" : 1497519250893,
          "id" : "app-20170615173410-0000",
          "name" : "Spark shell",
          "user" : "xingbojiang",
          "memoryperslave" : 1024,
          "submitdate" : "Thu Jun 15 17:34:10 CST 2017",
          "state" : "FINISHED",
          "duration" : 116895
        } ],
        "activedrivers" : [ ],
        "status" : "ALIVE"
      }
      ```
      
      After the change:
      ```
      {
        "url" : "spark://xingbos-MBP.local:7077",
        "workers" : [ {
          "id" : "worker-20170615175032-192.168.0.101-49951",
          "host" : "192.168.0.101",
          "port" : 49951,
          "webuiaddress" : "http://192.168.0.101:8081",
          "cores" : 8,
          "coresused" : 8,
          "coresfree" : 0,
          "memory" : 15360,
          "memoryused" : 1024,
          "memoryfree" : 14336,
          "state" : "ALIVE",
          "lastheartbeat" : 1497520292900
        }, {
          "id" : "worker-20170615175034-192.168.0.101-49953",
          "host" : "192.168.0.101",
          "port" : 49953,
          "webuiaddress" : "http://192.168.0.101:8082",
          "cores" : 8,
          "coresused" : 8,
          "coresfree" : 0,
          "memory" : 15360,
          "memoryused" : 1024,
          "memoryfree" : 14336,
          "state" : "ALIVE",
          "lastheartbeat" : 1497520280301
        }, {
          "id" : "worker-20170615175037-192.168.0.101-49955",
          "host" : "192.168.0.101",
          "port" : 49955,
          "webuiaddress" : "http://192.168.0.101:8083",
          "cores" : 8,
          "coresused" : 8,
          "coresfree" : 0,
          "memory" : 15360,
          "memoryused" : 1024,
          "memoryfree" : 14336,
          "state" : "ALIVE",
          "lastheartbeat" : 1497520282884
        } ],
        "aliveworkers" : 3,
        "cores" : 24,
        "coresused" : 24,
        "memory" : 46080,
        "memoryused" : 3072,
        "activeapps" : [ {
          "id" : "app-20170615175122-0001",
          "starttime" : 1497520282115,
          "name" : "Spark shell",
          "cores" : 24,
          "user" : "xingbojiang",
          "memoryperslave" : 1024,
          "submitdate" : "Thu Jun 15 17:51:22 CST 2017",
          "state" : "RUNNING",
          "duration" : 10805
        } ],
        "completedapps" : [ {
          "id" : "app-20170615175058-0000",
          "starttime" : 1497520258766,
          "name" : "Spark shell",
          "cores" : 24,
          "user" : "xingbojiang",
          "memoryperslave" : 1024,
          "submitdate" : "Thu Jun 15 17:50:58 CST 2017",
          "state" : "FINISHED",
          "duration" : 9876
        } ],
        "activedrivers" : [ ],
        "completeddrivers" : [ ],
        "status" : "ALIVE"
      }
      ```
      
      Author: Xingbo Jiang <xingbo.jiang@databricks.com>
      
      Closes #18303 from jiangxb1987/json-protocol.
      ea542d29
  12. Jun 18, 2017
    • liuxian's avatar
      [SPARK-21090][CORE] Optimize the unified memory manager code · 112bd9bf
      liuxian authored
      ## What changes were proposed in this pull request?
      1.In `acquireStorageMemory`, when the Memory Mode is OFF_HEAP ,the `maxOffHeapMemory` should be modified to `maxOffHeapStorageMemory`. after this PR,it will same as ON_HEAP Memory Mode.
      Because when acquire memory is between `maxOffHeapStorageMemory` and `maxOffHeapMemory`,it will fail surely, so if acquire memory is greater than  `maxOffHeapStorageMemory`(not greater than `maxOffHeapMemory`),we should fail fast.
      2. Borrow memory from execution, `numBytes` modified to `numBytes - storagePool.memoryFree` will be more reasonable.
      Because we just acquire `(numBytes - storagePool.memoryFree)`, unnecessary borrowed `numBytes` from execution
      
      ## How was this patch tested?
      added unit test case
      
      Author: liuxian <liu.xian3@zte.com.cn>
      
      Closes #18296 from 10110346/wip-lx-0614.
      112bd9bf
  13. Jun 16, 2017
  14. Jun 15, 2017
    • Michael Gummelt's avatar
      [SPARK-20434][YARN][CORE] Move Hadoop delegation token code from yarn to core · a18d6371
      Michael Gummelt authored
      ## What changes were proposed in this pull request?
      
      Move Hadoop delegation token code from `spark-yarn` to `spark-core`, so that other schedulers (such as Mesos), may use it.  In order to avoid exposing Hadoop interfaces in spark-core, the new Hadoop delegation token classes are kept private.  In order to provider backward compatiblity, and to allow YARN users to continue to load their own delegation token providers via Java service loading, the old YARN interfaces, as well as the client code that uses them, have been retained.
      
      Summary:
      - Move registered `yarn.security.ServiceCredentialProvider` classes from `spark-yarn` to `spark-core`.  Moved them into a new, private hierarchy under `HadoopDelegationTokenProvider`.  Client code in `HadoopDelegationTokenManager` now loads credentials from a whitelist of three providers (`HadoopFSDelegationTokenProvider`, `HiveDelegationTokenProvider`, `HBaseDelegationTokenProvider`), instead of service loading, which means that users are not able to implement their own delegation token providers, as they are in the `spark-yarn` module.
      
      - The `yarn.security.ServiceCredentialProvider` interface has been kept for backwards compatibility, and to continue to allow YARN users to implement their own delegation token provider implementations.  Client code in YARN now fetches tokens via the new `YARNHadoopDelegationTokenManager` class, which fetches tokens from the core providers through `HadoopDelegationTokenManager`, as well as service loads them from `yarn.security.ServiceCredentialProvider`.
      
      Old Hierarchy:
      
      ```
      yarn.security.ServiceCredentialProvider (service loaded)
        HadoopFSCredentialProvider
        HiveCredentialProvider
        HBaseCredentialProvider
      yarn.security.ConfigurableCredentialManager
      ```
      
      New Hierarchy:
      
      ```
      HadoopDelegationTokenManager
      HadoopDelegationTokenProvider (not service loaded)
        HadoopFSDelegationTokenProvider
        HiveDelegationTokenProvider
        HBaseDelegationTokenProvider
      
      yarn.security.ServiceCredentialProvider (service loaded)
      yarn.security.YARNHadoopDelegationTokenManager
      ```
      ## How was this patch tested?
      
      unit tests
      
      Author: Michael Gummelt <mgummelt@mesosphere.io>
      Author: Dr. Stefan Schimanski <sttts@mesosphere.io>
      
      Closes #17723 from mgummelt/SPARK-20434-refactor-kerberos.
      a18d6371
    • Xingbo Jiang's avatar
      [SPARK-16251][SPARK-20200][CORE][TEST] Flaky test:... · 7dc3e697
      Xingbo Jiang authored
      [SPARK-16251][SPARK-20200][CORE][TEST] Flaky test: org.apache.spark.rdd.LocalCheckpointSuite.missing checkpoint block fails with informative message
      
      ## What changes were proposed in this pull request?
      
      Currently we don't wait to confirm the removal of the block from the slave's BlockManager, if the removal takes too much time, we will fail the assertion in this test case.
      The failure can be easily reproduced if we sleep for a while before we remove the block in BlockManagerSlaveEndpoint.receiveAndReply().
      
      ## How was this patch tested?
      N/A
      
      Author: Xingbo Jiang <xingbo.jiang@databricks.com>
      
      Closes #18314 from jiangxb1987/LocalCheckpointSuite.
      7dc3e697
  15. Jun 14, 2017
  16. Jun 13, 2017
  17. Jun 11, 2017
    • Josh Rosen's avatar
      [SPARK-20715] Store MapStatuses only in MapOutputTracker, not ShuffleMapStage · 3476390c
      Josh Rosen authored
      ## What changes were proposed in this pull request?
      
      This PR refactors `ShuffleMapStage` and `MapOutputTracker` in order to simplify the management of `MapStatuses`, reduce driver memory consumption, and remove a potential source of scheduler correctness bugs.
      
      ### Background
      
      In Spark there are currently two places where MapStatuses are tracked:
      
      - The `MapOutputTracker` maintains an `Array[MapStatus]` storing a single location for each map output. This mapping is used by the `DAGScheduler` for determining reduce-task locality preferences (when locality-aware reduce task scheduling is enabled) and is also used to serve map output locations to executors / tasks.
      - Each `ShuffleMapStage` also contains a mapping of `Array[List[MapStatus]]` which holds the complete set of locations where each map output could be available. This mapping is used to determine which map tasks need to be run when constructing `TaskSets` for the stage.
      
      This duplication adds complexity and creates the potential for certain types of correctness bugs.  Bad things can happen if these two copies of the map output locations get out of sync. For instance, if the `MapOutputTracker` is missing locations for a map output but `ShuffleMapStage` believes that locations are available then tasks will fail with `MetadataFetchFailedException` but `ShuffleMapStage` will not be updated to reflect the missing map outputs, leading to situations where the stage will be reattempted (because downstream stages experienced fetch failures) but no task sets will be launched (because `ShuffleMapStage` thinks all maps are available).
      
      I observed this behavior in a real-world deployment. I'm still not quite sure how the state got out of sync in the first place, but we can completely avoid this class of bug if we eliminate the duplicate state.
      
      ### Why we only need to track a single location for each map output
      
      I think that storing an `Array[List[MapStatus]]` in `ShuffleMapStage` is unnecessary.
      
      First, note that this adds memory/object bloat to the driver we need one extra `List` per task. If you have millions of tasks across all stages then this can add up to be a significant amount of resources.
      
      Secondly, I believe that it's extremely uncommon that these lists will ever contain more than one entry. It's not impossible, but is very unlikely given the conditions which must occur for that to happen:
      
      - In normal operation (no task failures) we'll only run each task once and thus will have at most one output.
      - If speculation is enabled then it's possible that we'll have multiple attempts of a task. The TaskSetManager will [kill duplicate attempts of a task](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L717) after a task finishes successfully, reducing the likelihood that both the original and speculated task will successfully register map outputs.
      - There is a [comment in `TaskSetManager`](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L113) which suggests that running tasks are not killed if a task set becomes a zombie. However:
        - If the task set becomes a zombie due to the job being cancelled then it doesn't matter whether we record map outputs.
        - If the task set became a zombie because of a stage failure (e.g. the map stage itself had a fetch failure from an upstream match stage) then I believe that the "failedEpoch" will be updated which may cause map outputs from still-running tasks to [be ignored](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1213). (I'm not 100% sure on this point, though).
      - Even if you _do_ manage to record multiple map outputs for a stage, only a single map output is reported to / tracked by the MapOutputTracker. The only situation where the additional output locations could actually be read or used would be if a task experienced a `FetchFailure` exception. The most likely cause of a `FetchFailure` exception is an executor lost, which will have most likely caused the loss of several map tasks' output, so saving on potential re-execution of a single map task isn't a huge win if we're going to have to recompute several other lost map outputs from other tasks which ran on that lost executor. Also note that the re-population of MapOutputTracker state from state in the ShuffleMapTask only happens after the reduce stage has failed; the additional location doesn't help to prevent FetchFailures but, instead, can only reduce the amount of work when recomputing missing parent stages.
      
      Given this, this patch chooses to do away with tracking multiple locations for map outputs and instead stores only a single location. This change removes the main distinction between the `ShuffleMapTask` and `MapOutputTracker`'s copies of this state, paving the way for storing it only in the `MapOutputTracker`.
      
      ### Overview of other changes
      
      - Significantly simplified the cache / lock management inside of the `MapOutputTrackerMaster`:
        - The old code had several parallel `HashMap`s which had to be guarded by maps of `Object`s which were used as locks. This code was somewhat complicated to follow.
        - The new code uses a new `ShuffleStatus` class to group together all of the state associated with a particular shuffle, including cached serialized map statuses, significantly simplifying the logic.
      - Moved more code out of the shared `MapOutputTracker` abstract base class and into the `MapOutputTrackerMaster` and `MapOutputTrackerWorker` subclasses. This makes it easier to reason about which functionality needs to be supported only on the driver or executor.
      - Removed a bunch of code from the `DAGScheduler` which was used to synchronize information from the `MapOutputTracker` to `ShuffleMapStage`.
      - Added comments to clarify the role of `MapOutputTrackerMaster`'s `epoch` in invalidating executor-side shuffle map output caches.
      
      I will comment on these changes via inline GitHub review comments.
      
      /cc hvanhovell and rxin (whom I discussed this with offline), tgravescs (who recently worked on caching of serialized MapOutputStatuses), and kayousterhout and markhamstra (for scheduler changes).
      
      ## How was this patch tested?
      
      Existing tests. I purposely avoided making interface / API which would require significant updates or modifications to test code.
      
      Author: Josh Rosen <joshrosen@databricks.com>
      
      Closes #17955 from JoshRosen/map-output-tracker-rewrite.
      3476390c
Loading