Skip to content
Snippets Groups Projects
  1. Nov 19, 2016
    • hyukjinkwon's avatar
      [SPARK-18445][BUILD][DOCS] Fix the markdown for `Note:`/`NOTE:`/`Note... · 4b396a65
      hyukjinkwon authored
      [SPARK-18445][BUILD][DOCS] Fix the markdown for `Note:`/`NOTE:`/`Note that`/`'''Note:'''` across Scala/Java API documentation
      
      It seems in Scala/Java,
      
      - `Note:`
      - `NOTE:`
      - `Note that`
      - `'''Note:'''`
      - `note`
      
      This PR proposes to fix those to `note` to be consistent.
      
      **Before**
      
      - Scala
        ![2016-11-17 6 16 39](https://cloud.githubusercontent.com/assets/6477701/20383180/1a7aed8c-acf2-11e6-9611-5eaf6d52c2e0.png)
      
      - Java
        ![2016-11-17 6 14 41](https://cloud.githubusercontent.com/assets/6477701/20383096/c8ffc680-acf1-11e6-914a-33460bf1401d.png)
      
      **After**
      
      - Scala
        ![2016-11-17 6 16 44](https://cloud.githubusercontent.com/assets/6477701/20383167/09940490-acf2-11e6-937a-0d5e1dc2cadf.png)
      
      - Java
        ![2016-11-17 6 13 39](https://cloud.githubusercontent.com/assets/6477701/20383132/e7c2a57e-acf1-11e6-9c47-b849674d4d88.png
      
      )
      
      The notes were found via
      
      ```bash
      grep -r "NOTE: " . | \ # Note:|NOTE:|Note that|'''Note:'''
      grep -v "// NOTE: " | \  # starting with // does not appear in API documentation.
      grep -E '.scala|.java' | \ # java/scala files
      grep -v Suite | \ # exclude tests
      grep -v Test | \ # exclude tests
      grep -e 'org.apache.spark.api.java' \ # packages appear in API documenation
      -e 'org.apache.spark.api.java.function' \ # note that this is a regular expression. So actual matches were mostly `org/apache/spark/api/java/functions ...`
      -e 'org.apache.spark.api.r' \
      ...
      ```
      
      ```bash
      grep -r "Note that " . | \ # Note:|NOTE:|Note that|'''Note:'''
      grep -v "// Note that " | \  # starting with // does not appear in API documentation.
      grep -E '.scala|.java' | \ # java/scala files
      grep -v Suite | \ # exclude tests
      grep -v Test | \ # exclude tests
      grep -e 'org.apache.spark.api.java' \ # packages appear in API documenation
      -e 'org.apache.spark.api.java.function' \
      -e 'org.apache.spark.api.r' \
      ...
      ```
      
      ```bash
      grep -r "Note: " . | \ # Note:|NOTE:|Note that|'''Note:'''
      grep -v "// Note: " | \  # starting with // does not appear in API documentation.
      grep -E '.scala|.java' | \ # java/scala files
      grep -v Suite | \ # exclude tests
      grep -v Test | \ # exclude tests
      grep -e 'org.apache.spark.api.java' \ # packages appear in API documenation
      -e 'org.apache.spark.api.java.function' \
      -e 'org.apache.spark.api.r' \
      ...
      ```
      
      ```bash
      grep -r "'''Note:'''" . | \ # Note:|NOTE:|Note that|'''Note:'''
      grep -v "// '''Note:''' " | \  # starting with // does not appear in API documentation.
      grep -E '.scala|.java' | \ # java/scala files
      grep -v Suite | \ # exclude tests
      grep -v Test | \ # exclude tests
      grep -e 'org.apache.spark.api.java' \ # packages appear in API documenation
      -e 'org.apache.spark.api.java.function' \
      -e 'org.apache.spark.api.r' \
      ...
      ```
      
      And then fixed one by one comparing with API documentation/access modifiers.
      
      After that, manually tested via `jekyll build`.
      
      Author: hyukjinkwon <gurwls223@gmail.com>
      
      Closes #15889 from HyukjinKwon/SPARK-18437.
      
      (cherry picked from commit d5b1d5fc)
      Signed-off-by: default avatarSean Owen <sowen@cloudera.com>
      Unverified
      4b396a65
  2. Nov 15, 2016
  3. Nov 02, 2016
  4. Sep 22, 2016
    • Shixiong Zhu's avatar
      [SPARK-17638][STREAMING] Stop JVM StreamingContext when the Python process is dead · 3cdae0ff
      Shixiong Zhu authored
      ## What changes were proposed in this pull request?
      
      When the Python process is dead, the JVM StreamingContext is still running. Hence we will see a lot of Py4jException before the JVM process exits. It's better to stop the JVM StreamingContext to avoid those annoying logs.
      
      ## How was this patch tested?
      
      Jenkins
      
      Author: Shixiong Zhu <shixiong@databricks.com>
      
      Closes #15201 from zsxwing/stop-jvm-ssc.
      3cdae0ff
    • Dhruve Ashar's avatar
      [SPARK-17365][CORE] Remove/Kill multiple executors together to reduce RPC call time. · 17b72d31
      Dhruve Ashar authored
      ## What changes were proposed in this pull request?
      We are killing multiple executors together instead of iterating over expensive RPC calls to kill single executor.
      
      ## How was this patch tested?
      Executed sample spark job to observe executors being killed/removed with dynamic allocation enabled.
      
      Author: Dhruve Ashar <dashar@yahoo-inc.com>
      Author: Dhruve Ashar <dhruveashar@gmail.com>
      
      Closes #15152 from dhruve/impr/SPARK-17365.
      17b72d31
  5. Sep 21, 2016
    • Marcelo Vanzin's avatar
      [SPARK-4563][CORE] Allow driver to advertise a different network address. · 2cd1bfa4
      Marcelo Vanzin authored
      The goal of this feature is to allow the Spark driver to run in an
      isolated environment, such as a docker container, and be able to use
      the host's port forwarding mechanism to be able to accept connections
      from the outside world.
      
      The change is restricted to the driver: there is no support for achieving
      the same thing on executors (or the YARN AM for that matter). Those still
      need full access to the outside world so that, for example, connections
      can be made to an executor's block manager.
      
      The core of the change is simple: add a new configuration that tells what's
      the address the driver should bind to, which can be different than the address
      it advertises to executors (spark.driver.host). Everything else is plumbing
      the new configuration where it's needed.
      
      To use the feature, the host starting the container needs to set up the
      driver's port range to fall into a range that is being forwarded; this
      required the block manager port to need a special configuration just for
      the driver, which falls back to the existing spark.blockManager.port when
      not set. This way, users can modify the driver settings without affecting
      the executors; it would theoretically be nice to also have different
      retry counts for driver and executors, but given that docker (at least)
      allows forwarding port ranges, we can probably live without that for now.
      
      Because of the nature of the feature it's kinda hard to add unit tests;
      I just added a simple one to make sure the configuration works.
      
      This was tested with a docker image running spark-shell with the following
      command:
      
       docker blah blah blah \
         -p 38000-38100:38000-38100 \
         [image] \
         spark-shell \
           --num-executors 3 \
           --conf spark.shuffle.service.enabled=false \
           --conf spark.dynamicAllocation.enabled=false \
           --conf spark.driver.host=[host's address] \
           --conf spark.driver.port=38000 \
           --conf spark.driver.blockManager.port=38020 \
           --conf spark.ui.port=38040
      
      Running on YARN; verified the driver works, executors start up and listen
      on ephemeral ports (instead of using the driver's config), and that caching
      and shuffling (without the shuffle service) works. Clicked through the UI
      to make sure all pages (including executor thread dumps) worked. Also tested
      apps without docker, and ran unit tests.
      
      Author: Marcelo Vanzin <vanzin@cloudera.com>
      
      Closes #15120 from vanzin/SPARK-4563.
      2cd1bfa4
  6. Sep 07, 2016
    • Liwei Lin's avatar
      [SPARK-17359][SQL][MLLIB] Use ArrayBuffer.+=(A) instead of... · 3ce3a282
      Liwei Lin authored
      [SPARK-17359][SQL][MLLIB] Use ArrayBuffer.+=(A) instead of ArrayBuffer.append(A) in performance critical paths
      
      ## What changes were proposed in this pull request?
      
      We should generally use `ArrayBuffer.+=(A)` rather than `ArrayBuffer.append(A)`, because `append(A)` would involve extra boxing / unboxing.
      
      ## How was this patch tested?
      
      N/A
      
      Author: Liwei Lin <lwlin7@gmail.com>
      
      Closes #14914 from lw-lin/append_to_plus_eq_v2.
      3ce3a282
  7. Sep 06, 2016
    • Josh Rosen's avatar
      [SPARK-17110] Fix StreamCorruptionException in BlockManager.getRemoteValues() · 29cfab3f
      Josh Rosen authored
      ## What changes were proposed in this pull request?
      
      This patch fixes a `java.io.StreamCorruptedException` error affecting remote reads of cached values when certain data types are used. The problem stems from #11801 / SPARK-13990, a patch to have Spark automatically pick the "best" serializer when caching RDDs. If PySpark cached a PythonRDD, then this would be cached as an `RDD[Array[Byte]]` and the automatic serializer selection would pick KryoSerializer for replication and block transfer. However, the `getRemoteValues()` / `getRemoteBytes()` code path did not pass proper class tags in order to enable the same serializer to be used during deserialization, causing Java to be inappropriately used instead of Kryo, leading to the StreamCorruptedException.
      
      We already fixed a similar bug in #14311, which dealt with similar issues in block replication. Prior to that patch, it seems that we had no tests to ensure that block replication actually succeeded. Similarly, prior to this bug fix patch it looks like we had no tests to perform remote reads of cached data, which is why this bug was able to remain latent for so long.
      
      This patch addresses the bug by modifying `BlockManager`'s `get()` and  `getRemoteValues()` methods to accept ClassTags, allowing the proper class tag to be threaded in the `getOrElseUpdate` code path (which is used by `rdd.iterator`)
      
      ## How was this patch tested?
      
      Extended the caching tests in `DistributedSuite` to exercise the `getRemoteValues` path, plus manual testing to verify that the PySpark bug reproduction in SPARK-17110 is fixed.
      
      Author: Josh Rosen <joshrosen@databricks.com>
      
      Closes #14952 from JoshRosen/SPARK-17110.
      29cfab3f
  8. Sep 04, 2016
    • Shivansh's avatar
      [SPARK-17308] Improved the spark core code by replacing all pattern match on... · e75c162e
      Shivansh authored
      [SPARK-17308] Improved the spark core code by replacing all pattern match on boolean value by if/else block.
      
      ## What changes were proposed in this pull request?
      Improved the code quality of spark by replacing all pattern match on boolean value by if/else block.
      
      ## How was this patch tested?
      
      By running the tests
      
      Author: Shivansh <shiv4nsh@gmail.com>
      
      Closes #14873 from shiv4nsh/SPARK-17308.
      e75c162e
  9. Aug 17, 2016
    • Xin Ren's avatar
      [SPARK-17038][STREAMING] fix metrics retrieval source of 'lastReceivedBatch' · e6bef7d5
      Xin Ren authored
      https://issues.apache.org/jira/browse/SPARK-17038
      
      ## What changes were proposed in this pull request?
      
      StreamingSource's lastReceivedBatch_submissionTime, lastReceivedBatch_processingTimeStart, and lastReceivedBatch_processingTimeEnd all use data from lastCompletedBatch instead of lastReceivedBatch.
      
      In particular, this makes it impossible to match lastReceivedBatch_records with a batchID/submission time.
      
      This is apparent when looking at StreamingSource.scala, lines 89-94.
      
      ## How was this patch tested?
      
      Manually running unit tests on local laptop
      
      Author: Xin Ren <iamshrek@126.com>
      
      Closes #14681 from keypointt/SPARK-17038.
      e6bef7d5
    • Steve Loughran's avatar
      [SPARK-16736][CORE][SQL] purge superfluous fs calls · cc97ea18
      Steve Loughran authored
      A review of the code, working back from Hadoop's `FileSystem.exists()` and `FileSystem.isDirectory()` code, then removing uses of the calls when superfluous.
      
      1. delete is harmless if called on a nonexistent path, so don't do any checks before deletes
      1. any `FileSystem.exists()`  check before `getFileStatus()` or `open()` is superfluous as the operation itself does the check. Instead the `FileNotFoundException` is caught and triggers the downgraded path. When a `FileNotFoundException` was thrown before, the code still creates a new FNFE with the error messages. Though now the inner exceptions are nested, for easier diagnostics.
      
      Initially, relying on Jenkins test runs.
      
      One troublespot here is that some of the codepaths are clearly error situations; it's not clear that they have coverage anyway. Trying to create the failure conditions in tests would be ideal, but it will also be hard.
      
      Author: Steve Loughran <stevel@apache.org>
      
      Closes #14371 from steveloughran/cloud/SPARK-16736-superfluous-fs-calls.
      cc97ea18
  10. Aug 08, 2016
    • Holden Karau's avatar
      [SPARK-16779][TRIVIAL] Avoid using postfix operators where they do not add... · 9216901d
      Holden Karau authored
      [SPARK-16779][TRIVIAL] Avoid using postfix operators where they do not add much and remove whitelisting
      
      ## What changes were proposed in this pull request?
      
      Avoid using postfix operation for command execution in SQLQuerySuite where it wasn't whitelisted and audit existing whitelistings removing postfix operators from most places. Some notable places where postfix operation remains is in the XML parsing & time units (seconds, millis, etc.) where it arguably can improve readability.
      
      ## How was this patch tested?
      
      Existing tests.
      
      Author: Holden Karau <holden@us.ibm.com>
      
      Closes #14407 from holdenk/SPARK-16779.
      9216901d
  11. Aug 01, 2016
  12. Jul 30, 2016
    • Sean Owen's avatar
      [SPARK-16694][CORE] Use for/foreach rather than map for Unit expressions whose... · 0dc4310b
      Sean Owen authored
      [SPARK-16694][CORE] Use for/foreach rather than map for Unit expressions whose side effects are required
      
      ## What changes were proposed in this pull request?
      
      Use foreach/for instead of map where operation requires execution of body, not actually defining a transformation
      
      ## How was this patch tested?
      
      Jenkins
      
      Author: Sean Owen <sowen@cloudera.com>
      
      Closes #14332 from srowen/SPARK-16694.
      0dc4310b
  13. Jul 26, 2016
    • Dhruve Ashar's avatar
      [SPARK-15703][SCHEDULER][CORE][WEBUI] Make ListenerBus event queue size configurable · 0b71d9ae
      Dhruve Ashar authored
      ## What changes were proposed in this pull request?
      This change adds a new configuration entry to specify the size of the spark listener bus event queue. The value for this config ("spark.scheduler.listenerbus.eventqueue.size") is set to a default to 10000.
      
      Note:
      I haven't currently documented the configuration entry. We can decide whether it would be appropriate to make it a public configuration or keep it as an undocumented one. Refer JIRA for more details.
      
      ## How was this patch tested?
      Ran existing jobs and verified the event queue size with debug logs and from the Spark WebUI Environment tab.
      
      Author: Dhruve Ashar <dhruveashar@gmail.com>
      
      Closes #14269 from dhruve/bug/SPARK-15703.
      0b71d9ae
  14. Jul 25, 2016
    • Shixiong Zhu's avatar
      [SPARK-16722][TESTS] Fix a StreamingContext leak in StreamingContextSuite when eventually fails · e164a04b
      Shixiong Zhu authored
      ## What changes were proposed in this pull request?
      
      This PR moves `ssc.stop()` into `finally` for `StreamingContextSuite.createValidCheckpoint` to avoid leaking a StreamingContext since leaking a StreamingContext will fail a lot of tests and make us hard to find the real failure one.
      
      ## How was this patch tested?
      
      Jenkins unit tests
      
      Author: Shixiong Zhu <shixiong@databricks.com>
      
      Closes #14354 from zsxwing/ssc-leak.
      e164a04b
  15. Jul 24, 2016
    • Mikael Ståldal's avatar
      [SPARK-16416][CORE] force eager creation of loggers to avoid shutdown hook conflicts · 23e047f4
      Mikael Ståldal authored
      ## What changes were proposed in this pull request?
      
      Force eager creation of loggers to avoid shutdown hook conflicts.
      
      ## How was this patch tested?
      
      Manually tested with a project using Log4j 2, verified that the shutdown hook conflict issue was solved.
      
      Author: Mikael Ståldal <mikael.staldal@magine.com>
      
      Closes #14320 from mikaelstaldal/shutdown-hook-logging.
      23e047f4
  16. Jul 22, 2016
    • Ahmed Mahran's avatar
      [SPARK-16487][STREAMING] Fix some batches might not get marked as fully processed in JobGenerator · 2c72a443
      Ahmed Mahran authored
      ## What changes were proposed in this pull request?
      
      In `JobGenerator`, the code reads like that some batches might not get marked as fully processed. In the following flowchart, the batch should get marked fully processed before endpoint C however it is not. Currently, this does not actually cause an issue, as the condition `(time - zeroTime) is multiple of checkpoint duration?` always evaluates to `true` as the `checkpoint duration` is always set to be equal to the `batch duration`.
      
      ![Flowchart](https://s31.postimg.org/udy9lti2j/spark_streaming_job_generator.png)
      
      This PR fixes this issue so as to improve code readability and to avoid any potential issue in case there is any future change making checkpoint duration to be set different from batch duration.
      
      Author: Ahmed Mahran <ahmed.mahran@mashin.io>
      
      Closes #14145 from ahmed-mahran/b-mark-batch-fully-processed.
      2c72a443
  17. Jul 19, 2016
  18. Jul 11, 2016
    • Reynold Xin's avatar
      [SPARK-16477] Bump master version to 2.1.0-SNAPSHOT · ffcb6e05
      Reynold Xin authored
      ## What changes were proposed in this pull request?
      After SPARK-16476 (committed earlier today as #14128), we can finally bump the version number.
      
      ## How was this patch tested?
      N/A
      
      Author: Reynold Xin <rxin@databricks.com>
      
      Closes #14130 from rxin/SPARK-16477.
      ffcb6e05
  19. Jun 24, 2016
  20. Jun 22, 2016
    • Ahmed Mahran's avatar
      [SPARK-16120][STREAMING] getCurrentLogFiles in ReceiverSuite WAL generating... · c2cebdb7
      Ahmed Mahran authored
      [SPARK-16120][STREAMING] getCurrentLogFiles in ReceiverSuite WAL generating and cleaning case uses external variable instead of the passed parameter
      
      ## What changes were proposed in this pull request?
      
      In `ReceiverSuite.scala`, in the test case "write ahead log - generating and cleaning", the inner method `getCurrentLogFiles` uses external variable `logDirectory1` instead of the passed parameter `logDirectory`. This PR fixes this by using the passed method argument instead of variable from the outer scope.
      
      ## How was this patch tested?
      
      The unit test was re-run and the output logs were checked for the correct paths used.
      
      tdas
      
      Author: Ahmed Mahran <ahmed.mahran@mashin.io>
      
      Closes #13825 from ahmed-mahran/b-receiver-suite-wal-gen-cln.
      c2cebdb7
  21. Jun 12, 2016
    • Sean Owen's avatar
      [SPARK-15086][CORE][STREAMING] Deprecate old Java accumulator API · f51dfe61
      Sean Owen authored
      ## What changes were proposed in this pull request?
      
      - Deprecate old Java accumulator API; should use Scala now
      - Update Java tests and examples
      - Don't bother testing old accumulator API in Java 8 (too)
      - (fix a misspelling too)
      
      ## How was this patch tested?
      
      Jenkins tests
      
      Author: Sean Owen <sowen@cloudera.com>
      
      Closes #13606 from srowen/SPARK-15086.
      f51dfe61
  22. Jun 10, 2016
  23. Jun 06, 2016
    • Zheng RuiFeng's avatar
      [MINOR] Fix Typos 'an -> a' · fd8af397
      Zheng RuiFeng authored
      ## What changes were proposed in this pull request?
      
      `an -> a`
      
      Use cmds like `find . -name '*.R' | xargs -i sh -c "grep -in ' an [^aeiou]' {} && echo {}"` to generate candidates, and review them one by one.
      
      ## How was this patch tested?
      manual tests
      
      Author: Zheng RuiFeng <ruifengz@foxmail.com>
      
      Closes #13515 from zhengruifeng/an_a.
      fd8af397
  24. Jun 05, 2016
    • Josh Rosen's avatar
      [SPARK-15748][SQL] Replace inefficient foldLeft() call with flatMap() in PartitionStatistics · 26c1089c
      Josh Rosen authored
      `PartitionStatistics` uses `foldLeft` and list concatenation (`++`) to flatten an iterator of lists, but this is extremely inefficient compared to simply doing `flatMap`/`flatten` because it performs many unnecessary object allocations. Simply replacing this `foldLeft` by a `flatMap` results in decent performance gains when constructing PartitionStatistics instances for tables with many columns.
      
      This patch fixes this and also makes two similar changes in MLlib and streaming to try to fix all known occurrences of this pattern.
      
      Author: Josh Rosen <joshrosen@databricks.com>
      
      Closes #13491 from JoshRosen/foldleft-to-flatmap.
      26c1089c
  25. May 30, 2016
  26. May 27, 2016
    • Zheng RuiFeng's avatar
      [MINOR] Fix Typos 'a -> an' · 6b1a6180
      Zheng RuiFeng authored
      ## What changes were proposed in this pull request?
      
      `a` -> `an`
      
      I use regex to generate potential error lines:
      `grep -in ' a [aeiou]' mllib/src/main/scala/org/apache/spark/ml/*/*scala`
      and review them line by line.
      
      ## How was this patch tested?
      
      local build
      `lint-java` checking
      
      Author: Zheng RuiFeng <ruifengz@foxmail.com>
      
      Closes #13317 from zhengruifeng/a_an.
      6b1a6180
  27. May 25, 2016
    • lfzCarlosC's avatar
      [MINOR][MLLIB][STREAMING][SQL] Fix typos · 02c8072e
      lfzCarlosC authored
      fixed typos for source code for components [mllib] [streaming] and [SQL]
      
      None and obvious.
      
      Author: lfzCarlosC <lfz.carlos@gmail.com>
      
      Closes #13298 from lfzCarlosC/master.
      02c8072e
  28. May 17, 2016
  29. May 15, 2016
    • Sean Owen's avatar
      [SPARK-12972][CORE] Update org.apache.httpcomponents.httpclient · f5576a05
      Sean Owen authored
      ## What changes were proposed in this pull request?
      
      (Retry of https://github.com/apache/spark/pull/13049)
      
      - update to httpclient 4.5 / httpcore 4.4
      - remove some defunct exclusions
      - manage httpmime version to match
      - update selenium / httpunit to support 4.5 (possible now that Jetty 9 is used)
      
      ## How was this patch tested?
      
      Jenkins tests. Also, locally running the same test command of one Jenkins profile that failed: `mvn -Phadoop-2.6 -Pyarn -Phive -Phive-thriftserver -Pkinesis-asl ...`
      
      Author: Sean Owen <sowen@cloudera.com>
      
      Closes #13117 from srowen/SPARK-12972.2.
      f5576a05
  30. May 12, 2016
    • bomeng's avatar
      [SPARK-14897][SQL] upgrade to jetty 9.2.16 · 81bf8708
      bomeng authored
      ## What changes were proposed in this pull request?
      
      Since Jetty 8 is EOL (end of life) and has critical security issue [http://www.securityweek.com/critical-vulnerability-found-jetty-web-server], I think upgrading to 9 is necessary. I am using latest 9.2 since 9.3 requires Java 8+.
      
      `javax.servlet` and `derby` were also upgraded since Jetty 9.2 needs corresponding version.
      
      ## How was this patch tested?
      
      Manual test and current test cases should cover it.
      
      Author: bomeng <bmeng@us.ibm.com>
      
      Closes #12916 from bomeng/SPARK-14897.
      81bf8708
  31. May 11, 2016
    • mwws's avatar
      [SPARK-14976][STREAMING] make StreamingContext.textFileStream support wildcard · 33597810
      mwws authored
      ## What changes were proposed in this pull request?
      make StreamingContext.textFileStream support wildcard
      like /home/user/*/file
      
      ## How was this patch tested?
      I did manual test and added a new unit test case
      
      Author: mwws <wei.mao@intel.com>
      Author: unknown <maowei@maowei-MOBL.ccr.corp.intel.com>
      
      Closes #12752 from mwws/SPARK_FileStream.
      33597810
  32. May 09, 2016
    • mwws's avatar
      [MINOR][TEST][STREAMING] make "testDir" able to be claened after test. · 16a503cf
      mwws authored
      It's a minor bug in test case. `val testDir = null` will keep be `null` as it's immutable, so in finally block, nothing will be cleaned. Another `testDir` variable created in try block is only visible in try block.
      
      ## How was this patch tested?
      Run existing test case and passed.
      
      Author: mwws <wei.mao@intel.com>
      
      Closes #12999 from mwws/SPARK_MINOR.
      16a503cf
  33. May 06, 2016
    • Thomas Graves's avatar
      [SPARK-1239] Improve fetching of map output statuses · cc95f1ed
      Thomas Graves authored
      The main issue we are trying to solve is the memory bloat of the Driver when tasks request the map output statuses.  This means with a large number of tasks you either need a huge amount of memory on Driver or you have to repartition to smaller number.  This makes it really difficult to run over say 50000 tasks.
      
      The main issues that cause the memory bloat are:
      1) no flow control on sending the map output status responses.  We serialize the map status output  and then hand off to netty to send.  netty is sending asynchronously and it can't send them fast enough to keep up with incoming requests so we end up with lots of copies of the serialized map output statuses sitting there and this causes huge bloat when you have 10's of thousands of tasks and map output status is in the 10's of MB.
      2) When initial reduce tasks are started up, they all request the map output statuses from the Driver. These requests are handled by multiple threads in parallel so even though we check to see if we have a cached version, initially when we don't have a cached version yet, many of initial requests can all end up serializing the exact same map output statuses.
      
      This patch does a couple of things:
      - When the map output status size is over a threshold (default 512K) then it uses broadcast to send the map statuses.  This means we no longer serialize a large map output status and thus we don't have issues with memory bloat.  the messages sizes are now in the 300-400 byte range and the map status output are broadcast. If its under the threadshold it sends it as before, the message contains the DIRECT indicator now.
      - synchronize the incoming requests to allow one thread to cache the serialized output and broadcast the map output status  that can then be used by everyone else.  This ensures we don't create multiple broadcast variables when we don't need to.  To ensure this happens I added a second thread pool which the Dispatcher hands the requests to so that those threads can block without blocking the main dispatcher threads (which would cause things like heartbeats and such not to come through)
      
      Note that some of design and code was contributed by mridulm
      
      ## How was this patch tested?
      
      Unit tests and a lot of manually testing.
      Ran with akka and netty rpc. Ran with both dynamic allocation on and off.
      
      one of the large jobs I used to test this was a join of 15TB of data.  it had 200,000 map tasks, and  20,000 reduce tasks. Executors ranged from 200 to 2000.  This job ran successfully with 5GB of memory on the driver with these changes. Without these changes I was using 20GB and only had 500 reduce tasks.  The job has 50mb of serialized map output statuses and took roughly the same amount of time for the executors to get the map output statuses as before.
      
      Ran a variety of other jobs, from large wordcounts to small ones not using broadcasts.
      
      Author: Thomas Graves <tgraves@staydecay.corp.gq1.yahoo.com>
      
      Closes #12113 from tgravescs/SPARK-1239.
      cc95f1ed
  34. May 05, 2016
Loading