-
- Downloads
[SPARK-10342] [SPARK-10309] [SPARK-10474] [SPARK-10929] [SQL] Cooperative memory management
This PR introduce a mechanism to call spill() on those SQL operators that support spilling (for example, BytesToBytesMap, UnsafeExternalSorter and ShuffleExternalSorter) if there is not enough memory for execution. The preserved first page is needed anymore, so removed. Other Spillable objects in Spark core (ExternalSorter and AppendOnlyMap) are not included in this PR, but those could benefit from this (trigger others' spilling). The PrepareRDD may be not needed anymore, could be removed in follow up PR. The following script will fail with OOM before this PR, finished in 150 seconds with 2G heap (also works in 1.5 branch, with similar duration). ```python sqlContext.setConf("spark.sql.shuffle.partitions", "1") df = sqlContext.range(1<<25).selectExpr("id", "repeat(id, 2) as s") df2 = df.select(df.id.alias('id2'), df.s.alias('s2')) j = df.join(df2, df.id==df2.id2).groupBy(df.id).max("id", "id2") j.explain() print j.count() ``` For thread-safety, here what I'm got: 1) Without calling spill(), the operators should only be used by single thread, no safety problems. 2) spill() could be triggered in two cases, triggered by itself, or by other operators. we can check trigger == this in spill(), so it's still in the same thread, so safety problems. 3) if it's triggered by other operators (right now cache will not trigger spill()), we only spill the data into disk when it's in scanning stage (building is finished), so the in-memory sorter or memory pages are read-only, we only need to synchronize the iterator and change it. 4) During scanning, the iterator will only use one record in one page, we can't free this page, because the downstream is currently using it (used by UnsafeRow or other objects). In BytesToBytesMap, we just skip the current page, and dump all others into disk. In UnsafeExternalSorter, we keep the page that is used by current record (having the same baseObject), free it when loading the next record. In ShuffleExternalSorter, the spill() will not trigger during scanning. 5) In order to avoid deadlock, we didn't call acquireMemory during spill (so we reused the pointer array in InMemorySorter). Author: Davies Liu <davies@databricks.com> Closes #9241 from davies/force_spill.
Showing
- core/src/main/java/org/apache/spark/memory/MemoryConsumer.java 128 additions, 0 deletions...src/main/java/org/apache/spark/memory/MemoryConsumer.java
- core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 108 additions, 30 deletions.../main/java/org/apache/spark/memory/TaskMemoryManager.java
- core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java 71 additions, 139 deletions.../org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
- core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java 34 additions, 16 deletions.../org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
- core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java 0 additions, 6 deletions...va/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
- core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 260 additions, 170 deletions...ain/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
- core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java 223 additions, 203 deletions...ark/util/collection/unsafe/sort/UnsafeExternalSorter.java
- core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java 40 additions, 20 deletions...ark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
- core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java 3 additions, 3 deletions.../util/collection/unsafe/sort/UnsafeSorterSpillReader.java
- core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java 1 addition, 1 deletion.../util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
- core/src/main/scala/org/apache/spark/memory/MemoryManager.scala 7 additions, 2 deletions...rc/main/scala/org/apache/spark/memory/MemoryManager.scala
- core/src/main/scala/org/apache/spark/util/collection/Spillable.scala 2 additions, 2 deletions...in/scala/org/apache/spark/util/collection/Spillable.scala
- core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java 71 additions, 6 deletions.../java/org/apache/spark/memory/TaskMemoryManagerSuite.java
- core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java 17 additions, 13 deletions...g/apache/spark/shuffle/sort/PackedRecordPointerSuite.java
- core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java 3 additions, 3 deletions...apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
- core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java 13 additions, 25 deletions...g/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
- core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java 128 additions, 21 deletions...apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
- core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java 69 additions, 28 deletions...til/collection/unsafe/sort/UnsafeExternalSorterSuite.java
- core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java 11 additions, 9 deletions...til/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
- core/src/test/scala/org/apache/spark/FailureSuite.scala 2 additions, 2 deletionscore/src/test/scala/org/apache/spark/FailureSuite.scala
Loading
Please register or sign in to comment