Skip to content
Snippets Groups Projects
Commit 1742c3ab authored by Sean Zhong's avatar Sean Zhong Committed by Josh Rosen
Browse files

[SPARK-17503][CORE] Fix memory leak in Memory store when unable to cache the whole RDD in memory

## What changes were proposed in this pull request?

   MemoryStore may throw OutOfMemoryError when trying to cache a super big RDD that cannot fit in memory.
   ```
   scala> sc.parallelize(1 to 1000000000, 100).map(x => new Array[Long](1000)).cache().count()

   java.lang.OutOfMemoryError: Java heap space
	at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24)
	at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684)
	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
   ```

Spark MemoryStore uses SizeTrackingVector as a temporary unrolling buffer to store all input values that it has read so far before transferring the values to storage memory cache. The problem is that when the input RDD is too big for caching in memory, the temporary unrolling memory SizeTrackingVector is not garbage collected in time. As SizeTrackingVector can occupy all available storage memory, it may cause the executor JVM to run out of memory quickly.

More info can be found at https://issues.apache.org/jira/browse/SPARK-17503

## How was this patch tested?

Unit test and manual test.

### Before change

Heap memory consumption
<img width="702" alt="screen shot 2016-09-12 at 4 16 15 pm" src="https://cloud.githubusercontent.com/assets/2595532/18429524/60d73a26-7906-11e6-9768-6f286f5c58c8.png">

Heap dump
<img width="1402" alt="screen shot 2016-09-12 at 4 34 19 pm" src="https://cloud.githubusercontent.com/assets/2595532/18429577/cbc1ef20-7906-11e6-847b-b5903f450b3b.png">

### After change

Heap memory consumption
<img width="706" alt="screen shot 2016-09-12 at 4 29 10 pm" src="https://cloud.githubusercontent.com/assets/2595532/18429503/4abe9342-7906-11e6-844a-b2f815072624.png">

Author: Sean Zhong <seanzhong@databricks.com>

Closes #15056 from clockfly/memory_store_leak.
parent 8087ecf8
No related branches found
No related tags found
No related merge requests found
...@@ -663,31 +663,43 @@ private[spark] class MemoryStore( ...@@ -663,31 +663,43 @@ private[spark] class MemoryStore(
private[storage] class PartiallyUnrolledIterator[T]( private[storage] class PartiallyUnrolledIterator[T](
memoryStore: MemoryStore, memoryStore: MemoryStore,
unrollMemory: Long, unrollMemory: Long,
unrolled: Iterator[T], private[this] var unrolled: Iterator[T],
rest: Iterator[T]) rest: Iterator[T])
extends Iterator[T] { extends Iterator[T] {
private[this] var unrolledIteratorIsConsumed: Boolean = false private def releaseUnrollMemory(): Unit = {
private[this] var iter: Iterator[T] = { memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory)
val completionIterator = CompletionIterator[T, Iterator[T]](unrolled, { // SPARK-17503: Garbage collects the unrolling memory before the life end of
unrolledIteratorIsConsumed = true // PartiallyUnrolledIterator.
memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory) unrolled = null
})
completionIterator ++ rest
} }
override def hasNext: Boolean = iter.hasNext override def hasNext: Boolean = {
override def next(): T = iter.next() if (unrolled == null) {
rest.hasNext
} else if (!unrolled.hasNext) {
releaseUnrollMemory()
rest.hasNext
} else {
true
}
}
override def next(): T = {
if (unrolled == null) {
rest.next()
} else {
unrolled.next()
}
}
/** /**
* Called to dispose of this iterator and free its memory. * Called to dispose of this iterator and free its memory.
*/ */
def close(): Unit = { def close(): Unit = {
if (!unrolledIteratorIsConsumed) { if (unrolled != null) {
memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory) releaseUnrollMemory()
unrolledIteratorIsConsumed = true
} }
iter = null
} }
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.storage
import org.mockito.Matchers
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
import org.apache.spark.SparkFunSuite
import org.apache.spark.memory.MemoryMode.ON_HEAP
import org.apache.spark.storage.memory.{MemoryStore, PartiallyUnrolledIterator}
class PartiallyUnrolledIteratorSuite extends SparkFunSuite with MockitoSugar {
test("join two iterators") {
val unrollSize = 1000
val unroll = (0 until unrollSize).iterator
val restSize = 500
val rest = (unrollSize until restSize + unrollSize).iterator
val memoryStore = mock[MemoryStore]
val joinIterator = new PartiallyUnrolledIterator(memoryStore, unrollSize, unroll, rest)
// Firstly iterate over unrolling memory iterator
(0 until unrollSize).foreach { value =>
assert(joinIterator.hasNext)
assert(joinIterator.hasNext)
assert(joinIterator.next() == value)
}
joinIterator.hasNext
joinIterator.hasNext
verify(memoryStore, times(1))
.releaseUnrollMemoryForThisTask(Matchers.eq(ON_HEAP), Matchers.eq(unrollSize.toLong))
// Secondly, iterate over rest iterator
(unrollSize until unrollSize + restSize).foreach { value =>
assert(joinIterator.hasNext)
assert(joinIterator.hasNext)
assert(joinIterator.next() == value)
}
joinIterator.close()
// MemoryMode.releaseUnrollMemoryForThisTask is called only once
verifyNoMoreInteractions(memoryStore)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment