From c7aeecd08fd329085760fa89025ec0d9c04f5e3f Mon Sep 17 00:00:00 2001
From: jerryshao <saisai.shao@intel.com>
Date: Mon, 20 Oct 2014 10:20:21 -0700
Subject: [PATCH] [SPARK-3948][Shuffle]Fix stream corruption bug in sort-based
 shuffle

Kernel 2.6.32 bug will lead to unexpected behavior of transferTo in copyStream, and this will corrupt the shuffle output file in sort-based shuffle, which will somehow introduce PARSING_ERROR(2), deserialization error or offset out of range. Here fix this by adding append flag, also add some position checking code. Details can be seen in [SPARK-3948](https://issues.apache.org/jira/browse/SPARK-3948).

Author: jerryshao <saisai.shao@intel.com>

Closes #2824 from jerryshao/SPARK-3948 and squashes the following commits:

be0533a [jerryshao] Address the comments
a82b184 [jerryshao] add configuration to control the NIO way of copying stream
e17ada2 [jerryshao] Fix kernel 2.6.32 bug led unexpected behavior of transferTo
---
 .../scala/org/apache/spark/util/Utils.scala   | 29 ++++++++++++++++---
 .../util/collection/ExternalSorter.scala      |  5 ++--
 2 files changed, 28 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 53a7512edd..0aeff6455b 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -269,23 +269,44 @@ private[spark] object Utils extends Logging {
     dir
   }
 
-  /** Copy all data from an InputStream to an OutputStream */
+  /** Copy all data from an InputStream to an OutputStream. NIO way of file stream to file stream
+    * copying is disabled by default unless explicitly set transferToEnabled as true,
+    * the parameter transferToEnabled should be configured by spark.file.transferTo = [true|false].
+    */
   def copyStream(in: InputStream,
                  out: OutputStream,
-                 closeStreams: Boolean = false): Long =
+                 closeStreams: Boolean = false,
+                 transferToEnabled: Boolean = false): Long =
   {
     var count = 0L
     try {
-      if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]) {
+      if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
+        && transferToEnabled) {
         // When both streams are File stream, use transferTo to improve copy performance.
         val inChannel = in.asInstanceOf[FileInputStream].getChannel()
         val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
+        val initialPos = outChannel.position()
         val size = inChannel.size()
 
         // In case transferTo method transferred less data than we have required.
         while (count < size) {
           count += inChannel.transferTo(count, size - count, outChannel)
         }
+
+        // Check the position after transferTo loop to see if it is in the right position and
+        // give user information if not.
+        // Position will not be increased to the expected length after calling transferTo in
+        // kernel version 2.6.32, this issue can be seen in
+        // https://bugs.openjdk.java.net/browse/JDK-7052359
+        // This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948).
+        val finalPos = outChannel.position()
+        assert(finalPos == initialPos + size,
+          s"""
+             |Current position $finalPos do not equal to expected position ${initialPos + size}
+             |after transferTo, please check your kernel version to see if it is 2.6.32,
+             |this is a kernel bug which will lead to unexpected behavior when using transferTo.
+             |You can set spark.file.transferTo = false to disable this NIO feature.
+           """.stripMargin)
       } else {
         val buf = new Array[Byte](8192)
         var n = 0
@@ -727,7 +748,7 @@ private[spark] object Utils extends Logging {
 
   /**
    * Determines if a directory contains any files newer than cutoff seconds.
-   * 
+   *
    * @param dir must be the path to a directory, or IllegalArgumentException is thrown
    * @param cutoff measured in seconds. Returns true if there are any files or directories in the
    *               given directory whose last modified time is later than this many seconds ago
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 644fa36818..d1b06d14ac 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -93,6 +93,7 @@ private[spark] class ExternalSorter[K, V, C](
   private val conf = SparkEnv.get.conf
   private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true)
   private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
+  private val transferToEnabled = conf.getBoolean("spark.file.transferTo", true)
 
   // Size of object batches when reading/writing from serializers.
   //
@@ -705,10 +706,10 @@ private[spark] class ExternalSorter[K, V, C](
       var out: FileOutputStream = null
       var in: FileInputStream = null
       try {
-        out = new FileOutputStream(outputFile)
+        out = new FileOutputStream(outputFile, true)
         for (i <- 0 until numPartitions) {
           in = new FileInputStream(partitionWriters(i).fileSegment().file)
-          val size = org.apache.spark.util.Utils.copyStream(in, out, false)
+          val size = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled)
           in.close()
           in = null
           lengths(i) = size
-- 
GitLab