From 267c7be3b62174ec13db4568e72722ec61dfb23c Mon Sep 17 00:00:00 2001
From: Andrew Or <andrewor14@gmail.com>
Date: Tue, 7 Oct 2014 12:52:10 -0700
Subject: [PATCH] [SPARK-3825] Log more detail when unrolling a block fails

Before:
```
14/10/06 16:45:42 WARN CacheManager: Not enough space to cache partition rdd_0_2
in memory! Free memory is 481861527 bytes.
```
After:
```
14/10/07 11:08:24 WARN MemoryStore: Not enough space to cache rdd_2_0 in memory!
(computed 68.8 MB so far)
14/10/07 11:08:24 INFO MemoryStore: Memory use = 1088.0 B (blocks) + 445.1 MB
(scratch space shared across 8 thread(s)) = 445.1 MB. Storage limit = 459.5 MB.
```

Author: Andrew Or <andrewor14@gmail.com>

Closes #2688 from andrewor14/cache-log-message and squashes the following commits:

28e33d6 [Andrew Or] Shy away from "unrolling"
5638c49 [Andrew Or] Grammar
39a0c28 [Andrew Or] Log more detail when unrolling a block fails

(cherry picked from commit 553737c6e6d5ffa3b52a9888444f4beece5c5b1a)
Signed-off-by: Andrew Or <andrewor14@gmail.com>
---
 .../scala/org/apache/spark/CacheManager.scala |  2 -
 .../apache/spark/storage/MemoryStore.scala    | 45 ++++++++++++++++---
 2 files changed, 39 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index f8584b90ca..d89bb50076 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -168,8 +168,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
           arr.iterator.asInstanceOf[Iterator[T]]
         case Right(it) =>
           // There is not enough space to cache this partition in memory
-          logWarning(s"Not enough space to cache partition $key in memory! " +
-            s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.")
           val returnValues = it.asInstanceOf[Iterator[T]]
           if (putLevel.useDisk) {
             logWarning(s"Persisting partition $key to disk instead.")
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 0a09c24d61..edbc729c17 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -132,8 +132,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
         PutResult(res.size, res.data, droppedBlocks)
       case Right(iteratorValues) =>
         // Not enough space to unroll this block; drop to disk if applicable
-        logWarning(s"Not enough space to store block $blockId in memory! " +
-          s"Free memory is $freeMemory bytes.")
         if (level.useDisk && allowPersistToDisk) {
           logWarning(s"Persisting block $blockId to disk instead.")
           val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
@@ -265,6 +263,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
         Left(vector.toArray)
       } else {
         // We ran out of space while unrolling the values for this block
+        logUnrollFailureMessage(blockId, vector.estimateSize())
         Right(vector.iterator ++ values)
       }
 
@@ -424,7 +423,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
    * Reserve additional memory for unrolling blocks used by this thread.
    * Return whether the request is granted.
    */
-  private[spark] def reserveUnrollMemoryForThisThread(memory: Long): Boolean = {
+  def reserveUnrollMemoryForThisThread(memory: Long): Boolean = {
     accountingLock.synchronized {
       val granted = freeMemory > currentUnrollMemory + memory
       if (granted) {
@@ -439,7 +438,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
    * Release memory used by this thread for unrolling blocks.
    * If the amount is not specified, remove the current thread's allocation altogether.
    */
-  private[spark] def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = {
+  def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = {
     val threadId = Thread.currentThread().getId
     accountingLock.synchronized {
       if (memory < 0) {
@@ -457,16 +456,50 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
   /**
    * Return the amount of memory currently occupied for unrolling blocks across all threads.
    */
-  private[spark] def currentUnrollMemory: Long = accountingLock.synchronized {
+  def currentUnrollMemory: Long = accountingLock.synchronized {
     unrollMemoryMap.values.sum
   }
 
   /**
    * Return the amount of memory currently occupied for unrolling blocks by this thread.
    */
-  private[spark] def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized {
+  def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized {
     unrollMemoryMap.getOrElse(Thread.currentThread().getId, 0L)
   }
+
+  /**
+   * Return the number of threads currently unrolling blocks.
+   */
+  def numThreadsUnrolling: Int = accountingLock.synchronized { unrollMemoryMap.keys.size }
+
+  /**
+   * Log information about current memory usage.
+   */
+  def logMemoryUsage(): Unit = {
+    val blocksMemory = currentMemory
+    val unrollMemory = currentUnrollMemory
+    val totalMemory = blocksMemory + unrollMemory
+    logInfo(
+      s"Memory use = ${Utils.bytesToString(blocksMemory)} (blocks) + " +
+      s"${Utils.bytesToString(unrollMemory)} (scratch space shared across " +
+      s"$numThreadsUnrolling thread(s)) = ${Utils.bytesToString(totalMemory)}. " +
+      s"Storage limit = ${Utils.bytesToString(maxMemory)}."
+    )
+  }
+
+  /**
+   * Log a warning for failing to unroll a block.
+   *
+   * @param blockId ID of the block we are trying to unroll.
+   * @param finalVectorSize Final size of the vector before unrolling failed.
+   */
+  def logUnrollFailureMessage(blockId: BlockId, finalVectorSize: Long): Unit = {
+    logWarning(
+      s"Not enough space to cache $blockId in memory! " +
+      s"(computed ${Utils.bytesToString(finalVectorSize)} so far)"
+    )
+    logMemoryUsage()
+  }
 }
 
 private[spark] case class ResultWithDroppedBlocks(
-- 
GitLab