From 90f73fcc47c7bf881f808653d46a9936f37c3c31 Mon Sep 17 00:00:00 2001
From: Aaron Davidson <aaron@databricks.com>
Date: Fri, 10 Oct 2014 01:44:36 -0700
Subject: [PATCH] [SPARK-3889] Attempt to avoid SIGBUS by not mmapping files in
 ConnectionManager

In general, individual shuffle blocks are frequently small, so mmapping them often creates a lot of waste. It may not be bad to mmap the larger ones, but it is pretty inconvenient to get configuration into ManagedBuffer, and besides it is unlikely to help all that much.

Author: Aaron Davidson <aaron@databricks.com>

Closes #2742 from aarondav/mmap and squashes the following commits:

a152065 [Aaron Davidson] Add other pathway back
52b6cd2 [Aaron Davidson] [SPARK-3889] Attempt to avoid SIGBUS by not mmapping files in ConnectionManager
---
 .../org/apache/spark/network/ManagedBuffer.scala | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
index a4409181ec..4c9ca97a2a 100644
--- a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
@@ -66,13 +66,27 @@ sealed abstract class ManagedBuffer {
 final class FileSegmentManagedBuffer(val file: File, val offset: Long, val length: Long)
   extends ManagedBuffer {
 
+  /**
+   * Memory mapping is expensive and can destabilize the JVM (SPARK-1145, SPARK-3889).
+   * Avoid unless there's a good reason not to.
+   */
+  private val MIN_MEMORY_MAP_BYTES = 2 * 1024 * 1024;
+
   override def size: Long = length
 
   override def nioByteBuffer(): ByteBuffer = {
     var channel: FileChannel = null
     try {
       channel = new RandomAccessFile(file, "r").getChannel
-      channel.map(MapMode.READ_ONLY, offset, length)
+      // Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
+      if (length < MIN_MEMORY_MAP_BYTES) {
+        val buf = ByteBuffer.allocate(length.toInt)
+        channel.read(buf, offset)
+        buf.flip()
+        buf
+      } else {
+        channel.map(MapMode.READ_ONLY, offset, length)
+      }
     } catch {
       case e: IOException =>
         Try(channel.size).toOption match {
-- 
GitLab