diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 53a7512edd852406f104032b79763029a206710d..0aeff6455b3fe266dd2ca590d0da115b7d918014 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -269,23 +269,44 @@ private[spark] object Utils extends Logging {
     dir
   }
 
-  /** Copy all data from an InputStream to an OutputStream */
+  /** Copy all data from an InputStream to an OutputStream. NIO way of file stream to file stream
+    * copying is disabled by default unless explicitly set transferToEnabled as true,
+    * the parameter transferToEnabled should be configured by spark.file.transferTo = [true|false].
+    */
   def copyStream(in: InputStream,
                  out: OutputStream,
-                 closeStreams: Boolean = false): Long =
+                 closeStreams: Boolean = false,
+                 transferToEnabled: Boolean = false): Long =
   {
     var count = 0L
     try {
-      if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]) {
+      if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
+        && transferToEnabled) {
         // When both streams are File stream, use transferTo to improve copy performance.
         val inChannel = in.asInstanceOf[FileInputStream].getChannel()
         val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
+        val initialPos = outChannel.position()
         val size = inChannel.size()
 
         // In case transferTo method transferred less data than we have required.
         while (count < size) {
           count += inChannel.transferTo(count, size - count, outChannel)
         }
+
+        // Check the position after transferTo loop to see if it is in the right position and
+        // give user information if not.
+        // Position will not be increased to the expected length after calling transferTo in
+        // kernel version 2.6.32, this issue can be seen in
+        // https://bugs.openjdk.java.net/browse/JDK-7052359
+        // This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948).
+        val finalPos = outChannel.position()
+        assert(finalPos == initialPos + size,
+          s"""
+             |Current position $finalPos do not equal to expected position ${initialPos + size}
+             |after transferTo, please check your kernel version to see if it is 2.6.32,
+             |this is a kernel bug which will lead to unexpected behavior when using transferTo.
+             |You can set spark.file.transferTo = false to disable this NIO feature.
+           """.stripMargin)
       } else {
         val buf = new Array[Byte](8192)
         var n = 0
@@ -727,7 +748,7 @@ private[spark] object Utils extends Logging {
 
   /**
    * Determines if a directory contains any files newer than cutoff seconds.
-   * 
+   *
    * @param dir must be the path to a directory, or IllegalArgumentException is thrown
    * @param cutoff measured in seconds. Returns true if there are any files or directories in the
    *               given directory whose last modified time is later than this many seconds ago
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 644fa368186478caddc0c03445acca01a1812191..d1b06d14acbd2353f1af78185ffa72cd31998881 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -93,6 +93,7 @@ private[spark] class ExternalSorter[K, V, C](
   private val conf = SparkEnv.get.conf
   private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true)
   private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
+  private val transferToEnabled = conf.getBoolean("spark.file.transferTo", true)
 
   // Size of object batches when reading/writing from serializers.
   //
@@ -705,10 +706,10 @@ private[spark] class ExternalSorter[K, V, C](
       var out: FileOutputStream = null
       var in: FileInputStream = null
       try {
-        out = new FileOutputStream(outputFile)
+        out = new FileOutputStream(outputFile, true)
         for (i <- 0 until numPartitions) {
           in = new FileInputStream(partitionWriters(i).fileSegment().file)
-          val size = org.apache.spark.util.Utils.copyStream(in, out, false)
+          val size = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled)
           in.close()
           in = null
           lengths(i) = size