Skip to content
Snippets Groups Projects
  1. Sep 13, 2017
    • Armin's avatar
      [SPARK-21970][CORE] Fix Redundant Throws Declarations in Java Codebase · b6ef1f57
      Armin authored
      ## What changes were proposed in this pull request?
      
      1. Removing all redundant throws declarations from Java codebase.
      2. Removing dead code made visible by this from `ShuffleExternalSorter#closeAndGetSpills`
      
      ## How was this patch tested?
      
      Build still passes.
      
      Author: Armin <me@obrown.io>
      
      Closes #19182 from original-brownbear/SPARK-21970.
      b6ef1f57
  2. Sep 05, 2017
    • jerryshao's avatar
      [SPARK-9104][CORE] Expose Netty memory metrics in Spark · 445f1790
      jerryshao authored
      ## What changes were proposed in this pull request?
      
      This PR exposes Netty memory usage for Spark's `TransportClientFactory` and `TransportServer`, including the details of each direct arena and heap arena metrics, as well as aggregated metrics. The purpose of adding the Netty metrics is to better know the memory usage of Netty in Spark shuffle, rpc and others network communications, and guide us to better configure the memory size of executors.
      
      This PR doesn't expose these metrics to any sink, to leverage this feature, still requires to connect to either MetricsSystem or collect them back to Driver to display.
      
      ## How was this patch tested?
      
      Add Unit test to verify it, also manually verified in real cluster.
      
      Author: jerryshao <sshao@hortonworks.com>
      
      Closes #18935 from jerryshao/SPARK-9104.
      445f1790
  3. Aug 30, 2017
    • jerryshao's avatar
      [SPARK-17321][YARN] Avoid writing shuffle metadata to disk if NM recovery is disabled · 4482ff23
      jerryshao authored
      In the current code, if NM recovery is not enabled then `YarnShuffleService` will write shuffle metadata to NM local dir-1, if this local dir-1 is on bad disk, then `YarnShuffleService` will be failed to start. So to solve this issue, in Spark side if NM recovery is not enabled, then Spark will not persist data into leveldb, in that case yarn shuffle service can still be served but lose the ability for recovery, (it is fine because the failure of NM will kill the containers as well as applications).
      
      Tested in the local cluster with NM recovery off and on to see if folder is created or not. MiniCluster UT isn't added because in MiniCluster NM will always set port to 0, but NM recovery requires non-ephemeral port.
      
      Author: jerryshao <sshao@hortonworks.com>
      
      Closes #19032 from jerryshao/SPARK-17321.
      
      Change-Id: I8f2fe73d175e2ad2c4e380caede3873e0192d027
      4482ff23
    • liuxian's avatar
      [MINOR][TEST] Off -heap memory leaks for unit tests · d4895c9d
      liuxian authored
      ## What changes were proposed in this pull request?
      Free off -heap memory .
      I have checked all the unit tests.
      
      ## How was this patch tested?
      N/A
      
      Author: liuxian <liu.xian3@zte.com.cn>
      
      Closes #19075 from 10110346/memleak.
      d4895c9d
  4. Aug 25, 2017
    • Sean Owen's avatar
      [MINOR][BUILD] Fix build warnings and Java lint errors · de7af295
      Sean Owen authored
      ## What changes were proposed in this pull request?
      
      Fix build warnings and Java lint errors. This just helps a bit in evaluating (new) warnings in another PR I have open.
      
      ## How was this patch tested?
      
      Existing tests
      
      Author: Sean Owen <sowen@cloudera.com>
      
      Closes #19051 from srowen/JavaWarnings.
      de7af295
  5. Aug 24, 2017
    • xu.zhang's avatar
      [SPARK-21701][CORE] Enable RPC client to use ` SO_RCVBUF` and ` SO_SNDBUF` in SparkConf. · 763b83ee
      xu.zhang authored
      ## What changes were proposed in this pull request?
      
      TCP parameters like SO_RCVBUF and SO_SNDBUF can be set in SparkConf, and `org.apache.spark.network.server.TransportServe`r can use those parameters to build server by leveraging netty. But for TransportClientFactory, there is no such way to set those parameters from SparkConf. This could be inconsistent in server and client side when people set parameters in SparkConf. So this PR make RPC client to be enable to use those TCP parameters as well.
      
      ## How was this patch tested?
      
      Existing tests.
      
      Author: xu.zhang <xu.zhang@hulu.com>
      
      Closes #18964 from neoremind/add_client_param.
      763b83ee
  6. Aug 23, 2017
    • Sanket Chintapalli's avatar
      [SPARK-21501] Change CacheLoader to limit entries based on memory footprint · 1662e931
      Sanket Chintapalli authored
      Right now the spark shuffle service has a cache for index files. It is based on a # of files cached (spark.shuffle.service.index.cache.entries). This can cause issues if people have a lot of reducers because the size of each entry can fluctuate based on the # of reducers.
      We saw an issues with a job that had 170000 reducers and it caused NM with spark shuffle service to use 700-800MB or memory in NM by itself.
      We should change this cache to be memory based and only allow a certain memory size used. When I say memory based I mean the cache should have a limit of say 100MB.
      
      https://issues.apache.org/jira/browse/SPARK-21501
      
      Manual Testing with 170000 reducers has been performed with cache loaded up to max 100MB default limit, with each shuffle index file of size 1.3MB. Eviction takes place as soon as the total cache size reaches the 100MB limit and the objects will be ready for garbage collection there by avoiding NM to crash. No notable difference in runtime has been observed.
      
      Author: Sanket Chintapalli <schintap@yahoo-inc.com>
      
      Closes #18940 from redsanket/SPARK-21501.
      1662e931
  7. Aug 08, 2017
  8. Aug 07, 2017
    • zhoukang's avatar
      [SPARK-21544][DEPLOY][TEST-MAVEN] Tests jar of some module should not upload twice · 8b69b17f
      zhoukang authored
      ## What changes were proposed in this pull request?
      
      **For moudle below:**
      common/network-common
      streaming
      sql/core
      sql/catalyst
      **tests.jar will install or deploy twice.Like:**
      `[DEBUG] Installing org.apache.spark:spark-streaming_2.11/maven-metadata.xml to /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/maven-metadata-local.xml
      [INFO] Installing /home/mi/Work/Spark/scala2.11/spark/streaming/target/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar to /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/2.1.0-mdh2.1.0.1-SNAPSHOT/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar
      [DEBUG] Skipped re-installing /home/mi/Work/Spark/scala2.11/spark/streaming/target/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar to /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/2.1.0-mdh2.1.0.1-SNAPSHOT/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar, seems unchanged`
      **The reason is below:**
      `[DEBUG]   (f) artifact = org.apache.spark:spark-streaming_2.11:jar:2.1.0-mdh2.1.0.1-SNAPSHOT
      [DEBUG]   (f) attachedArtifacts = [org.apache.spark:spark-streaming_2.11:test-jar:tests:2.1.0-mdh2.1.0.1-SNAPSHOT, org.apache.spark:spark-streaming_2.11:jar:tests:2.1.0-mdh2.1.0.1-SNAPSHOT, org.apache.spark:spark
      -streaming_2.11:java-source:sources:2.1.0-mdh2.1.0.1-SNAPSHOT, org.apache.spark:spark-streaming_2.11:java-source:test-sources:2.1.0-mdh2.1.0.1-SNAPSHOT, org.apache.spark:spark-streaming_2.11:javadoc:javadoc:2.1.0
      -mdh2.1.0.1-SNAPSHOT]`
      
      when executing 'mvn deploy' to nexus during release.I will fail since release nexus can not be overrided.
      
      ## How was this patch tested?
      Execute 'mvn clean install -Pyarn -Phadoop-2.6 -Phadoop-provided -DskipTests'
      
      Author: zhoukang <zhoukang199191@gmail.com>
      
      Closes #18745 from caneGuy/zhoukang/fix-installtwice.
      8b69b17f
  9. Aug 01, 2017
    • Grzegorz Slowikowski's avatar
      [SPARK-21592][BUILD] Skip maven-compiler-plugin main and test compilations in Maven build · 74cda94c
      Grzegorz Slowikowski authored
      `scala-maven-plugin` in `incremental` mode compiles `Scala` and `Java` classes. There is no need to execute `maven-compiler-plugin` goals to compile (in fact recompile) `Java`.
      
      This change reduces compilation time (over 10% on my machine).
      
      Author: Grzegorz Slowikowski <gslowikowski@gmail.com>
      
      Closes #18750 from gslowikowski/remove-redundant-compilation-from-maven.
      74cda94c
    • jerryshao's avatar
      [SPARK-21475][CORE] Use NIO's Files API to replace... · 5fd0294f
      jerryshao authored
      [SPARK-21475][CORE] Use NIO's Files API to replace FileInputStream/FileOutputStream in some critical paths
      
      ## What changes were proposed in this pull request?
      
      Java's `FileInputStream` and `FileOutputStream` overrides finalize(), even this file input/output stream is closed correctly and promptly, it will still leave some memory footprints which will only get cleaned in Full GC. This will introduce two side effects:
      
      1. Lots of memory footprints regarding to Finalizer will be kept in memory and this will increase the memory overhead. In our use case of external shuffle service, a busy shuffle service will have bunch of this object and potentially lead to OOM.
      2. The Finalizer will only be called in Full GC, and this will increase the overhead of Full GC and lead to long GC pause.
      
      https://bugs.openjdk.java.net/browse/JDK-8080225
      
      https://www.cloudbees.com/blog/fileinputstream-fileoutputstream-considered-harmful
      
      So to fix this potential issue, here propose to use NIO's Files#newInput/OutputStream instead in some critical paths like shuffle.
      
      Left unchanged FileInputStream in core which I think is not so critical:
      
      ```
      ./core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala:467:    val file = new DataInputStream(new FileInputStream(filename))
      ./core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala:942:    val in = new FileInputStream(new File(path))
      ./core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala:76:    val fileIn = new FileInputStream(file)
      ./core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala:248:        val fis = new FileInputStream(file)
      ./core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:910:                input = new FileInputStream(new File(t))
      ./core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala:20:import java.io.{FileInputStream, InputStream}
      ./core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala:132:        case Some(f) => new FileInputStream(f)
      ./core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala:20:import java.io.{FileInputStream, InputStream}
      ./core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala:77:        val fis = new FileInputStream(f)
      ./core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala:27:import org.apache.spark.io.NioBufferedFileInputStream
      ./core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala:94:      new DataInputStream(new NioBufferedFileInputStream(index))
      ./core/src/main/scala/org/apache/spark/storage/DiskStore.scala:111:        val channel = new FileInputStream(file).getChannel()
      ./core/src/main/scala/org/apache/spark/storage/DiskStore.scala:219:    val channel = new FileInputStream(file).getChannel()
      ./core/src/main/scala/org/apache/spark/TestUtils.scala:20:import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
      ./core/src/main/scala/org/apache/spark/TestUtils.scala:106:      val in = new FileInputStream(file)
      ./core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala:89:        inputStream = new FileInputStream(activeFile)
      ./core/src/main/scala/org/apache/spark/util/Utils.scala:329:      if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
      ./core/src/main/scala/org/apache/spark/util/Utils.scala:332:        val inChannel = in.asInstanceOf[FileInputStream].getChannel()
      ./core/src/main/scala/org/apache/spark/util/Utils.scala:1533:      gzInputStream = new GZIPInputStream(new FileInputStream(file))
      ./core/src/main/scala/org/apache/spark/util/Utils.scala:1560:      new GZIPInputStream(new FileInputStream(file))
      ./core/src/main/scala/org/apache/spark/util/Utils.scala:1562:      new FileInputStream(file)
      ./core/src/main/scala/org/apache/spark/util/Utils.scala:2090:    val inReader = new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8)
      ```
      
      Left unchanged FileOutputStream in core:
      
      ```
      ./core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala:957:    val out = new FileOutputStream(file)
      ./core/src/main/scala/org/apache/spark/api/r/RBackend.scala:20:import java.io.{DataOutputStream, File, FileOutputStream, IOException}
      ./core/src/main/scala/org/apache/spark/api/r/RBackend.scala:131:      val dos = new DataOutputStream(new FileOutputStream(f))
      ./core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala:62:    val fileOut = new FileOutputStream(file)
      ./core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala:160:          val outStream = new FileOutputStream(outPath)
      ./core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala:239:    val zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFile, false))
      ./core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:949:        val out = new FileOutputStream(tempFile)
      ./core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala:20:import java.io.{File, FileOutputStream, InputStream, IOException}
      ./core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala:106:    val out = new FileOutputStream(file, true)
      ./core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala:109:     * Therefore, for local files, use FileOutputStream instead. */
      ./core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala:112:        new FileOutputStream(uri.getPath)
      ./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:20:import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStream}
      ./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:71:  private var fos: FileOutputStream = null
      ./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:102:    fos = new FileOutputStream(file, true)
      ./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:213:      var truncateStream: FileOutputStream = null
      ./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:215:        truncateStream = new FileOutputStream(file, true)
      ./core/src/main/scala/org/apache/spark/storage/DiskStore.scala:153:    val out = new FileOutputStream(file).getChannel()
      ./core/src/main/scala/org/apache/spark/TestUtils.scala:20:import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
      ./core/src/main/scala/org/apache/spark/TestUtils.scala:81:    val jarStream = new JarOutputStream(new FileOutputStream(jarFile))
      ./core/src/main/scala/org/apache/spark/TestUtils.scala:96:    val jarFileStream = new FileOutputStream(jarFile)
      ./core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala:20:import java.io.{File, FileOutputStream, InputStream, IOException}
      ./core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala:31:  volatile private var outputStream: FileOutputStream = null
      ./core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala:97:    outputStream = new FileOutputStream(file, true)
      ./core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala:90:        gzOutputStream = new GZIPOutputStream(new FileOutputStream(gzFile))
      ./core/src/main/scala/org/apache/spark/util/Utils.scala:329:      if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
      ./core/src/main/scala/org/apache/spark/util/Utils.scala:333:        val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
      ./core/src/main/scala/org/apache/spark/util/Utils.scala:527:      val out = new FileOutputStream(tempFile)
      ```
      
      Here in `DiskBlockObjectWriter`, it uses `FileDescriptor` so it is not easy to change to NIO Files API.
      
      For the `FileInputStream` and `FileOutputStream` in common/shuffle* I changed them all.
      
      ## How was this patch tested?
      
      Existing tests and manual verification.
      
      Author: jerryshao <sshao@hortonworks.com>
      
      Closes #18684 from jerryshao/SPARK-21475.
      5fd0294f
  10. Jul 28, 2017
    • Sean Owen's avatar
      [MINOR][BUILD] Fix current lint-java failures · 63d168cb
      Sean Owen authored
      ## What changes were proposed in this pull request?
      
      Fixes current failures in dev/lint-java
      
      ## How was this patch tested?
      
      Existing linter, tests.
      
      Author: Sean Owen <sowen@cloudera.com>
      
      Closes #18757 from srowen/LintJava.
      63d168cb
  11. Jul 26, 2017
    • jinxing's avatar
      [SPARK-21530] Update description of spark.shuffle.maxChunksBeingTransferred. · cfb25b27
      jinxing authored
      ## What changes were proposed in this pull request?
      
      Update the description of `spark.shuffle.maxChunksBeingTransferred` to include that the new coming connections will be closed when the max is hit and client should have retry mechanism.
      
      Author: jinxing <jinxing6042@126.com>
      
      Closes #18735 from jinxing64/SPARK-21530.
      cfb25b27
  12. Jul 25, 2017
    • Marcelo Vanzin's avatar
      [SPARK-21494][NETWORK] Use correct app id when authenticating to external service. · 300807c6
      Marcelo Vanzin authored
      There was some code based on the old SASL handler in the new auth client that
      was incorrectly using the SASL user as the user to authenticate against the
      external shuffle service. This caused the external service to not be able to
      find the correct secret to authenticate the connection, failing the connection.
      
      In the course of debugging, I found that some log messages from the YARN shuffle
      service were a little noisy, so I silenced some of them, and also added a couple
      of new ones that helped find this issue. On top of that, I found that a check
      in the code that records app secrets was wrong, causing more log spam and also
      using an O(n) operation instead of an O(1) call.
      
      Also added a new integration suite for the YARN shuffle service with auth on,
      and verified it failed before, and passes now.
      
      Author: Marcelo Vanzin <vanzin@cloudera.com>
      
      Closes #18706 from vanzin/SPARK-21494.
      300807c6
    • jinxing's avatar
      [SPARK-21175] Reject OpenBlocks when memory shortage on shuffle service. · 799e1316
      jinxing authored
      ## What changes were proposed in this pull request?
      
      A shuffle service can serves blocks from multiple apps/tasks. Thus the shuffle service can suffers high memory usage when lots of shuffle-reads happen at the same time. In my cluster, OOM always happens on shuffle service. Analyzing heap dump, memory cost by Netty(ChannelOutboundBufferEntry) can be up to 2~3G. It might make sense to reject "open blocks" request when memory usage is high on shuffle service.
      
      https://github.com/apache/spark/commit/93dd0c518d040155b04e5ab258c5835aec7776fc and https://github.com/apache/spark/commit/85c6ce61930490e2247fb4b0e22dfebbb8b6a1ee tried to alleviate the memory pressure on shuffle service but cannot solve the root cause. This pr proposes to control currency of shuffle read.
      
      ## How was this patch tested?
      Added unit test.
      
      Author: jinxing <jinxing6042@126.com>
      
      Closes #18388 from jinxing64/SPARK-21175.
      799e1316
  13. Jul 17, 2017
    • Burak Yavuz's avatar
      [SPARK-21445] Make IntWrapper and LongWrapper in UTF8String Serializable · 26cd2ca0
      Burak Yavuz authored
      ## What changes were proposed in this pull request?
      
      Making those two classes will avoid Serialization issues like below:
      ```
      Caused by: java.io.NotSerializableException: org.apache.spark.unsafe.types.UTF8String$IntWrapper
      Serialization stack:
          - object not serializable (class: org.apache.spark.unsafe.types.UTF8String$IntWrapper, value: org.apache.spark.unsafe.types.UTF8String$IntWrapper326450e)
          - field (class: org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToInt$1, name: result$2, type: class org.apache.spark.unsafe.types.UTF8String$IntWrapper)
          - object (class org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToInt$1, <function1>)
      ```
      
      ## How was this patch tested?
      
      - [x] Manual testing
      - [ ] Unit test
      
      Author: Burak Yavuz <brkyvz@gmail.com>
      
      Closes #18660 from brkyvz/serializableutf8.
      26cd2ca0
  14. Jul 14, 2017
    • Kazuaki Ishizaki's avatar
      [SPARK-21344][SQL] BinaryType comparison does signed byte array comparison · ac5d5d79
      Kazuaki Ishizaki authored
      ## What changes were proposed in this pull request?
      
      This PR fixes a wrong comparison for `BinaryType`. This PR enables unsigned comparison and unsigned prefix generation for an array for `BinaryType`. Previous implementations uses signed operations.
      
      ## How was this patch tested?
      
      Added a test suite in `OrderingSuite`.
      
      Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
      
      Closes #18571 from kiszk/SPARK-21344.
      ac5d5d79
  15. Jul 10, 2017
    • Shixiong Zhu's avatar
      [SPARK-21369][CORE] Don't use Scala Tuple2 in common/network-* · 833eab2c
      Shixiong Zhu authored
      ## What changes were proposed in this pull request?
      
      Remove all usages of Scala Tuple2 from common/network-* projects. Otherwise, Yarn users cannot use `spark.reducer.maxReqSizeShuffleToMem`.
      
      ## How was this patch tested?
      
      Jenkins.
      
      Author: Shixiong Zhu <shixiong@databricks.com>
      
      Closes #18593 from zsxwing/SPARK-21369.
      833eab2c
    • jinxing's avatar
      [SPARK-21342] Fix DownloadCallback to work well with RetryingBlockFetcher. · 6a06c4b0
      jinxing authored
      ## What changes were proposed in this pull request?
      
      When `RetryingBlockFetcher` retries fetching blocks. There could be two `DownloadCallback`s download the same content to the same target file. It could cause `ShuffleBlockFetcherIterator` reading a partial result.
      
      This pr proposes to create and delete the tmp files in `OneForOneBlockFetcher`
      
      Author: jinxing <jinxing6042@126.com>
      Author: Shixiong Zhu <zsxwing@gmail.com>
      
      Closes #18565 from jinxing64/SPARK-21342.
      6a06c4b0
  16. Jun 30, 2017
    • Wenchen Fan's avatar
      [SPARK-17528][SQL] data should be copied properly before saving into InternalRow · 4eb41879
      Wenchen Fan authored
      ## What changes were proposed in this pull request?
      
      For performance reasons, `UnsafeRow.getString`, `getStruct`, etc. return a "pointer" that points to a memory region of this unsafe row. This makes the unsafe projection a little dangerous, because all of its output rows share one instance.
      
      When we implement SQL operators, we should be careful to not cache the input rows because they may be produced by unsafe projection from child operator and thus its content may change overtime.
      
      However, when we updating values of InternalRow(e.g. in mutable projection and safe projection), we only copy UTF8String, we should also copy InternalRow, ArrayData and MapData. This PR fixes this, and also fixes the copy of vairous InternalRow, ArrayData and MapData implementations.
      
      ## How was this patch tested?
      
      new regression tests
      
      Author: Wenchen Fan <wenchen@databricks.com>
      
      Closes #18483 from cloud-fan/fix-copy.
      4eb41879
  17. Jun 29, 2017
    • Shixiong Zhu's avatar
      [SPARK-21253][CORE][HOTFIX] Fix Scala 2.10 build · cfc696f4
      Shixiong Zhu authored
      ## What changes were proposed in this pull request?
      
      A follow up PR to fix Scala 2.10 build for #18472
      
      ## How was this patch tested?
      
      Jenkins
      
      Author: Shixiong Zhu <shixiong@databricks.com>
      
      Closes #18478 from zsxwing/SPARK-21253-2.
      cfc696f4
    • Shixiong Zhu's avatar
      [SPARK-21253][CORE] Fix a bug that StreamCallback may not be notified if network errors happen · 4996c539
      Shixiong Zhu authored
      ## What changes were proposed in this pull request?
      
      If a network error happens before processing StreamResponse/StreamFailure events, StreamCallback.onFailure won't be called.
      
      This PR fixes `failOutstandingRequests` to also notify outstanding StreamCallbacks.
      
      ## How was this patch tested?
      
      The new unit tests.
      
      Author: Shixiong Zhu <shixiong@databricks.com>
      
      Closes #18472 from zsxwing/fix-stream-2.
      4996c539
  18. Jun 23, 2017
    • Dhruve Ashar's avatar
      [SPARK-21181] Release byteBuffers to suppress netty error messages · 1ebe7ffe
      Dhruve Ashar authored
      ## What changes were proposed in this pull request?
      We are explicitly calling release on the byteBuf's used to encode the string to Base64 to suppress the memory leak error message reported by netty. This is to make it less confusing for the user.
      
      ### Changes proposed in this fix
      By explicitly invoking release on the byteBuf's we are decrement the internal reference counts for the wrappedByteBuf's. Now, when the GC kicks in, these would be reclaimed as before, just that netty wouldn't report any memory leak error messages as the internal ref. counts are now 0.
      
      ## How was this patch tested?
      Ran a few spark-applications and examined the logs. The error message no longer appears.
      
      Original PR was opened against branch-2.1 => https://github.com/apache/spark/pull/18392
      
      Author: Dhruve Ashar <dhruveashar@gmail.com>
      
      Closes #18407 from dhruve/master.
      1ebe7ffe
  19. Jun 21, 2017
    • Li Yichao's avatar
      [SPARK-20640][CORE] Make rpc timeout and retry for shuffle registration configurable. · d107b3b9
      Li Yichao authored
      ## What changes were proposed in this pull request?
      
      Currently the shuffle service registration timeout and retry has been hardcoded. This works well for small workloads but under heavy workload when the shuffle service is busy transferring large amount of data we see significant delay in responding to the registration request, as a result we often see the executors fail to register with the shuffle service, eventually failing the job. We need to make these two parameters configurable.
      
      ## How was this patch tested?
      
      * Updated `BlockManagerSuite` to test registration timeout and max attempts configuration actually works.
      
      cc sitalkedia
      
      Author: Li Yichao <lyc@zhihu.com>
      
      Closes #18092 from liyichao/SPARK-20640.
      d107b3b9
  20. Jun 19, 2017
    • Dongjoon Hyun's avatar
      [MINOR][BUILD] Fix Java linter errors · ecc56313
      Dongjoon Hyun authored
      ## What changes were proposed in this pull request?
      
      This PR cleans up a few Java linter errors for Apache Spark 2.2 release.
      
      ## How was this patch tested?
      
      ```bash
      $ dev/lint-java
      Using `mvn` from path: /usr/local/bin/mvn
      Checkstyle checks passed.
      ```
      
      We can check the result at Travis CI, [here](https://travis-ci.org/dongjoon-hyun/spark/builds/244297894).
      
      Author: Dongjoon Hyun <dongjoon@apache.org>
      
      Closes #18345 from dongjoon-hyun/fix_lint_java_2.
      ecc56313
  21. Jun 16, 2017
    • jinxing's avatar
      [SPARK-20994] Remove redundant characters in OpenBlocks to save memory for shuffle service. · 93dd0c51
      jinxing authored
      ## What changes were proposed in this pull request?
      
      In current code, blockIds in `OpenBlocks` are stored in the iterator on shuffle service.
      There are some redundant characters in  blockId(`"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId`). This pr proposes to improve the footprint and alleviate the memory pressure on shuffle service.
      
      Author: jinxing <jinxing6042@126.com>
      
      Closes #18231 from jinxing64/SPARK-20994-v2.
      93dd0c51
  22. Jun 06, 2017
    • Marcelo Vanzin's avatar
      [SPARK-20641][CORE] Add key-value store abstraction and LevelDB implementation. · 0cba4951
      Marcelo Vanzin authored
      This change adds an abstraction and LevelDB implementation for a key-value
      store that will be used to store UI and SHS data.
      
      The interface is described in KVStore.java (see javadoc). Specifics
      of the LevelDB implementation are discussed in the javadocs of both
      LevelDB.java and LevelDBTypeInfo.java.
      
      Included also are a few small benchmarks just to get some idea of
      latency. Because they're too slow for regular unit test runs, they're
      disabled by default.
      
      Tested with the included unit tests, and also as part of the overall feature
      implementation (including running SHS with hundreds of apps).
      
      Author: Marcelo Vanzin <vanzin@cloudera.com>
      
      Closes #17902 from vanzin/shs-ng/M1.
      0cba4951
  23. May 29, 2017
    • Kazuaki Ishizaki's avatar
      [SPARK-20750][SQL] Built-in SQL Function Support - REPLACE · ef9fd920
      Kazuaki Ishizaki authored
      ## What changes were proposed in this pull request?
      
      This PR adds built-in SQL function `(REPLACE(<string_expression>, <search_string> [, <replacement_string>])`
      
      `REPLACE()` return that string that is replaced all occurrences with given string.
      
      ## How was this patch tested?
      
      added new test suites
      
      Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
      
      Closes #18047 from kiszk/SPARK-20750.
      ef9fd920
  24. May 25, 2017
    • jinxing's avatar
      [SPARK-19659] Fetch big blocks to disk when shuffle-read. · 3f94e64a
      jinxing authored
      ## What changes were proposed in this pull request?
      
      Currently the whole block is fetched into memory(off heap by default) when shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can be large when skew situations. If OOM happens during shuffle read, job will be killed and users will be notified to "Consider boosting spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more memory can resolve the OOM. However the approach is not perfectly suitable for production environment, especially for data warehouse.
      Using Spark SQL as data engine in warehouse, users hope to have a unified parameter(e.g. memory) but less resource wasted(resource is allocated but not used). The hope is strong especially when migrating data engine to Spark from another one(e.g. Hive). Tuning the parameter for thousands of SQLs one by one is very time consuming.
      It's not always easy to predict skew situations, when happen, it make sense to fetch remote blocks to disk for shuffle-read, rather than kill the job because of OOM.
      
      In this pr, I propose to fetch big blocks to disk(which is also mentioned in SPARK-3019):
      
      1. Track average size and also the outliers(which are larger than 2*avgSize) in MapStatus;
      2. Request memory from `MemoryManager` before fetch blocks and release the memory to `MemoryManager` when `ManagedBuffer` is released.
      3. Fetch remote blocks to disk when failing acquiring memory from `MemoryManager`, otherwise fetch to memory.
      
      This is an improvement for memory control when shuffle blocks and help to avoid OOM in scenarios like below:
      1. Single huge block;
      2. Sizes of many blocks are underestimated in `MapStatus` and the actual footprint of blocks is much larger than the estimated.
      
      ## How was this patch tested?
      Added unit test in `MapStatusSuite` and `ShuffleBlockFetcherIteratorSuite`.
      
      Author: jinxing <jinxing6042@126.com>
      
      Closes #16989 from jinxing64/SPARK-19659.
      3f94e64a
  25. May 22, 2017
    • Mark Grover's avatar
      [SPARK-20756][YARN] yarn-shuffle jar references unshaded guava · 36309110
      Mark Grover authored
      and contains scala classes
      
      ## What changes were proposed in this pull request?
      This change ensures that all references to guava from within the yarn shuffle jar pointed to the shaded guava class already provided in the jar.
      
      Also, it explicitly excludes scala classes from being added to the jar.
      
      ## How was this patch tested?
      Ran unit tests on the module and they passed.
      javap now returns the expected result - reference to the shaded guava under `org/spark_project` (previously this was referring to `com.google...`
      ```
      javap -cp common/network-yarn/target/scala-2.11/spark-2.3.0-SNAPSHOT-yarn-shuffle.jar -c org/apache/spark/network/yarn/YarnShuffleService | grep Lists
            57: invokestatic  #138                // Method org/spark_project/guava/collect/Lists.newArrayList:()Ljava/util/ArrayList;
      ```
      
      Guava is still shaded in the jar:
      ```
      jar -tf common/network-yarn/target/scala-2.11/spark-2.3.0-SNAPSHOT-yarn-shuffle.jar | grep guava | head
      META-INF/maven/com.google.guava/
      META-INF/maven/com.google.guava/guava/
      META-INF/maven/com.google.guava/guava/pom.properties
      META-INF/maven/com.google.guava/guava/pom.xml
      org/spark_project/guava/
      org/spark_project/guava/annotations/
      org/spark_project/guava/annotations/Beta.class
      org/spark_project/guava/annotations/GwtCompatible.class
      org/spark_project/guava/annotations/GwtIncompatible.class
      org/spark_project/guava/annotations/VisibleForTesting.class
      ```
      (not sure if the above META-INF/* is a problem or not)
      
      I took this jar, deployed it on a yarn cluster with shuffle service enabled, and made sure the YARN node managers came up. An application with a shuffle was run and it succeeded.
      
      Author: Mark Grover <mark@apache.org>
      
      Closes #17990 from markgrover/spark-20756.
      36309110
  26. May 10, 2017
    • Xianyang Liu's avatar
      [MINOR][BUILD] Fix lint-java breaks. · fcb88f92
      Xianyang Liu authored
      ## What changes were proposed in this pull request?
      
      This PR proposes to fix the lint-breaks as below:
      ```
      [ERROR] src/main/java/org/apache/spark/unsafe/Platform.java:[51] (regexp) RegexpSingleline: No trailing whitespace allowed.
      [ERROR] src/main/scala/org/apache/spark/sql/streaming/Trigger.java:[45,25] (naming) MethodName: Method name 'ProcessingTime' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
      [ERROR] src/main/scala/org/apache/spark/sql/streaming/Trigger.java:[62,25] (naming) MethodName: Method name 'ProcessingTime' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
      [ERROR] src/main/scala/org/apache/spark/sql/streaming/Trigger.java:[78,25] (naming) MethodName: Method name 'ProcessingTime' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
      [ERROR] src/main/scala/org/apache/spark/sql/streaming/Trigger.java:[92,25] (naming) MethodName: Method name 'ProcessingTime' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
      [ERROR] src/main/scala/org/apache/spark/sql/streaming/Trigger.java:[102,25] (naming) MethodName: Method name 'Once' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
      [ERROR] src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java:[28,8] (imports) UnusedImports: Unused import - org.apache.spark.streaming.api.java.JavaDStream.
      ```
      
      after:
      ```
      dev/lint-java
      Checkstyle checks passed.
      ```
      [Test Result](https://travis-ci.org/ConeyLiu/spark/jobs/229666169)
      
      ## How was this patch tested?
      
      Travis CI
      
      Author: Xianyang Liu <xianyang.liu@intel.com>
      
      Closes #17890 from ConeyLiu/codestyle.
      fcb88f92
  27. May 03, 2017
    • Sean Owen's avatar
      [SPARK-20523][BUILD] Clean up build warnings for 2.2.0 release · 16fab6b0
      Sean Owen authored
      ## What changes were proposed in this pull request?
      
      Fix build warnings primarily related to Breeze 0.13 operator changes, Java style problems
      
      ## How was this patch tested?
      
      Existing tests
      
      Author: Sean Owen <sowen@cloudera.com>
      
      Closes #17803 from srowen/SPARK-20523.
      16fab6b0
  28. Apr 27, 2017
    • jinxing's avatar
      [SPARK-20426] Lazy initialization of FileSegmentManagedBuffer for shuffle service. · 85c6ce61
      jinxing authored
      ## What changes were proposed in this pull request?
      When application contains large amount of shuffle blocks. NodeManager requires lots of memory to keep metadata(`FileSegmentManagedBuffer`) in `StreamManager`. When the number of shuffle blocks is big enough. NodeManager can run OOM. This pr proposes to do lazy initialization of `FileSegmentManagedBuffer` in shuffle service.
      
      ## How was this patch tested?
      
      Manually test.
      
      Author: jinxing <jinxing6042@126.com>
      
      Closes #17744 from jinxing64/SPARK-20426.
      85c6ce61
  29. Apr 26, 2017
    • Tom Graves's avatar
      [SPARK-19812] YARN shuffle service fails to relocate recovery DB acro… · 7fecf513
      Tom Graves authored
      …ss NFS directories
      
      ## What changes were proposed in this pull request?
      
      Change from using java Files.move to use Hadoop filesystem operations to move the directories.  The java Files.move does not work when moving directories across NFS mounts and in fact also says that if the directory has entries you should do a recursive move. We are already using Hadoop filesystem here so just use the local filesystem from there as it handles this properly.
      
      Note that the DB here is actually a directory of files and not just a single file, hence the change in the name of the local var.
      
      ## How was this patch tested?
      
      Ran YarnShuffleServiceSuite unit tests.  Unfortunately couldn't easily add one here since involves NFS.
      Ran manual tests to verify that the DB directories were properly moved across NFS mounted directories. Have been running this internally for weeks.
      
      Author: Tom Graves <tgraves@apache.org>
      
      Closes #17748 from tgravescs/SPARK-19812.
      7fecf513
  30. Apr 24, 2017
  31. Apr 10, 2017
    • Shixiong Zhu's avatar
      [SPARK-17564][TESTS] Fix flaky RequestTimeoutIntegrationSuite.furtherRequestsDelay · 734dfbfc
      Shixiong Zhu authored
      ## What changes were proposed in this pull request?
      
      This PR  fixs the following failure:
      ```
      sbt.ForkMain$ForkError: java.lang.AssertionError: null
      	at org.junit.Assert.fail(Assert.java:86)
      	at org.junit.Assert.assertTrue(Assert.java:41)
      	at org.junit.Assert.assertTrue(Assert.java:52)
      	at org.apache.spark.network.RequestTimeoutIntegrationSuite.furtherRequestsDelay(RequestTimeoutIntegrationSuite.java:230)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:497)
      	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
      	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
      	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
      	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
      	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
      	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
      	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
      	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
      	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
      	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
      	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
      	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
      	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
      	at org.junit.runners.Suite.runChild(Suite.java:128)
      	at org.junit.runners.Suite.runChild(Suite.java:27)
      	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
      	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
      	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
      	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
      	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
      	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
      	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
      	at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
      	at com.novocode.junit.JUnitRunner$1.execute(JUnitRunner.java:132)
      	at sbt.ForkMain$Run$2.call(ForkMain.java:296)
      	at sbt.ForkMain$Run$2.call(ForkMain.java:286)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      ```
      
      It happens several times per month on [Jenkins](http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.network.RequestTimeoutIntegrationSuite&test_name=furtherRequestsDelay). The failure is because `callback1` may not be called before `assertTrue(callback1.failure instanceof IOException);`. It's pretty easy to reproduce this error by adding a sleep before this line: https://github.com/apache/spark/blob/379b0b0bbdbba2278ce3bcf471bd75f6ffd9cf0d/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java#L267
      
      The fix is straightforward: just use the latch to wait until `callback1` is called.
      
      ## How was this patch tested?
      
      Jenkins
      
      Author: Shixiong Zhu <shixiong@databricks.com>
      
      Closes #17599 from zsxwing/SPARK-17564.
      734dfbfc
    • Sean Owen's avatar
      [SPARK-20156][CORE][SQL][STREAMING][MLLIB] Java String toLowerCase "Turkish... · a26e3ed5
      Sean Owen authored
      [SPARK-20156][CORE][SQL][STREAMING][MLLIB] Java String toLowerCase "Turkish locale bug" causes Spark problems
      
      ## What changes were proposed in this pull request?
      
      Add Locale.ROOT to internal calls to String `toLowerCase`, `toUpperCase`, to avoid inadvertent locale-sensitive variation in behavior (aka the "Turkish locale problem").
      
      The change looks large but it is just adding `Locale.ROOT` (the locale with no country or language specified) to every call to these methods.
      
      ## How was this patch tested?
      
      Existing tests.
      
      Author: Sean Owen <sowen@cloudera.com>
      
      Closes #17527 from srowen/SPARK-20156.
      a26e3ed5
  32. Apr 09, 2017
  33. Mar 30, 2017
Loading