diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index f8584b90cabe60b46fc243350f70aefa96c494e0..d89bb50076c9a0d65f41b3bb58ca885899363ae3 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -168,8 +168,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { arr.iterator.asInstanceOf[Iterator[T]] case Right(it) => // There is not enough space to cache this partition in memory - logWarning(s"Not enough space to cache partition $key in memory! " + - s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.") val returnValues = it.asInstanceOf[Iterator[T]] if (putLevel.useDisk) { logWarning(s"Persisting partition $key to disk instead.") diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 0a09c24d618790d0b2416603efb28bf57a9fbddc..edbc729c17adeb0dd23b831029f5536d19bb6aaf 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -132,8 +132,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) PutResult(res.size, res.data, droppedBlocks) case Right(iteratorValues) => // Not enough space to unroll this block; drop to disk if applicable - logWarning(s"Not enough space to store block $blockId in memory! " + - s"Free memory is $freeMemory bytes.") if (level.useDisk && allowPersistToDisk) { logWarning(s"Persisting block $blockId to disk instead.") val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues) @@ -265,6 +263,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) Left(vector.toArray) } else { // We ran out of space while unrolling the values for this block + logUnrollFailureMessage(blockId, vector.estimateSize()) Right(vector.iterator ++ values) } @@ -424,7 +423,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * Reserve additional memory for unrolling blocks used by this thread. * Return whether the request is granted. */ - private[spark] def reserveUnrollMemoryForThisThread(memory: Long): Boolean = { + def reserveUnrollMemoryForThisThread(memory: Long): Boolean = { accountingLock.synchronized { val granted = freeMemory > currentUnrollMemory + memory if (granted) { @@ -439,7 +438,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * Release memory used by this thread for unrolling blocks. * If the amount is not specified, remove the current thread's allocation altogether. */ - private[spark] def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = { + def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = { val threadId = Thread.currentThread().getId accountingLock.synchronized { if (memory < 0) { @@ -457,16 +456,50 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) /** * Return the amount of memory currently occupied for unrolling blocks across all threads. */ - private[spark] def currentUnrollMemory: Long = accountingLock.synchronized { + def currentUnrollMemory: Long = accountingLock.synchronized { unrollMemoryMap.values.sum } /** * Return the amount of memory currently occupied for unrolling blocks by this thread. */ - private[spark] def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized { + def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized { unrollMemoryMap.getOrElse(Thread.currentThread().getId, 0L) } + + /** + * Return the number of threads currently unrolling blocks. + */ + def numThreadsUnrolling: Int = accountingLock.synchronized { unrollMemoryMap.keys.size } + + /** + * Log information about current memory usage. + */ + def logMemoryUsage(): Unit = { + val blocksMemory = currentMemory + val unrollMemory = currentUnrollMemory + val totalMemory = blocksMemory + unrollMemory + logInfo( + s"Memory use = ${Utils.bytesToString(blocksMemory)} (blocks) + " + + s"${Utils.bytesToString(unrollMemory)} (scratch space shared across " + + s"$numThreadsUnrolling thread(s)) = ${Utils.bytesToString(totalMemory)}. " + + s"Storage limit = ${Utils.bytesToString(maxMemory)}." + ) + } + + /** + * Log a warning for failing to unroll a block. + * + * @param blockId ID of the block we are trying to unroll. + * @param finalVectorSize Final size of the vector before unrolling failed. + */ + def logUnrollFailureMessage(blockId: BlockId, finalVectorSize: Long): Unit = { + logWarning( + s"Not enough space to cache $blockId in memory! " + + s"(computed ${Utils.bytesToString(finalVectorSize)} so far)" + ) + logMemoryUsage() + } } private[spark] case class ResultWithDroppedBlocks(