diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index be37cad4a0b50c9727d1b47438b9d238e7c21b1c..d80fec75bc1ef5be0801283d4ddcd30ed4f0f86f 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -459,26 +459,18 @@ private[spark] class MemoryStore( // (because of getValue or getBytes) while traversing the iterator, as that // can lead to exceptions. entries.synchronized { - // TODO: add our logic here - - - - - - - - val iterator = entries.entrySet().iterator() + val iterator = blockIdAndSizeSet.iterator() while (freedMemory < space && iterator.hasNext) { - val pair = iterator.next() - val blockId = pair.getKey - val entry = pair.getValue + val idAndSize = iterator.next() + val blockId = idAndSize.blockId + val entry = entries.get(blockId) if (blockIsEvictable(blockId, entry)) { // We don't want to evict blocks which are currently being read, so we need to obtain // an exclusive write lock on blocks which are candidates for eviction. We perform a // non-blocking "tryLock" here in order to ignore blocks which are locked for reading: if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) { selectedBlocks += blockId - freedMemory += pair.getValue.size + freedMemory += idAndSize.size } } }