From a11db942aaf4c470a85f8a1b180f034f7a584254 Mon Sep 17 00:00:00 2001 From: Xianyang Liu <xianyang.liu@intel.com> Date: Tue, 19 Sep 2017 14:51:27 +0800 Subject: [PATCH] [SPARK-21923][CORE] Avoid calling reserveUnrollMemoryForThisTask for every record ## What changes were proposed in this pull request? When Spark persist data to Unsafe memory, we call the method `MemoryStore.putIteratorAsBytes`, which need synchronize the `memoryManager` for every record write. This implementation is not necessary, we can apply for more memory at a time to reduce unnecessary synchronization. ## How was this patch tested? Test case (with 1 executor 20 core): ```scala val start = System.currentTimeMillis() val data = sc.parallelize(0 until Integer.MAX_VALUE, 100) .persist(StorageLevel.OFF_HEAP) .count() println(System.currentTimeMillis() - start) ``` Test result: before | 27647 | 29108 | 28591 | 28264 | 27232 | after | 26868 | 26358 | 27767 | 26653 | 26693 | Author: Xianyang Liu <xianyang.liu@intel.com> Closes #19135 from ConeyLiu/memorystore. --- .../apache/spark/internal/config/package.scala | 15 +++++++++++++++ .../spark/storage/memory/MemoryStore.scala | 18 ++++++++++++++---- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 0d3769a735..e0f696080e 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -385,4 +385,19 @@ package object config { .checkValue(v => v > 0 && v <= Int.MaxValue, s"The buffer size must be greater than 0 and less than ${Int.MaxValue}.") .createWithDefault(1024 * 1024) + + private[spark] val UNROLL_MEMORY_CHECK_PERIOD = + ConfigBuilder("spark.storage.unrollMemoryCheckPeriod") + .internal() + .doc("The memory check period is used to determine how often we should check whether " + + "there is a need to request more memory when we try to unroll the given block in memory.") + .longConf + .createWithDefault(16) + + private[spark] val UNROLL_MEMORY_GROWTH_FACTOR = + ConfigBuilder("spark.storage.unrollMemoryGrowthFactor") + .internal() + .doc("Memory to request as a multiple of the size that used to unroll the block.") + .doubleConf + .createWithDefault(1.5) } diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 90e3af2d0e..eb2201d142 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -29,6 +29,7 @@ import com.google.common.io.ByteStreams import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{UNROLL_MEMORY_CHECK_PERIOD, UNROLL_MEMORY_GROWTH_FACTOR} import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.serializer.{SerializationStream, SerializerManager} import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel, StreamBlockId} @@ -190,11 +191,11 @@ private[spark] class MemoryStore( // Initial per-task memory to request for unrolling blocks (bytes). val initialMemoryThreshold = unrollMemoryThreshold // How often to check whether we need to request more memory - val memoryCheckPeriod = 16 + val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD) // Memory currently reserved by this task for this particular unrolling operation var memoryThreshold = initialMemoryThreshold // Memory to request as a multiple of current vector size - val memoryGrowthFactor = 1.5 + val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR) // Keep track of unroll memory used by this particular block / putIterator() operation var unrollMemoryUsedByThisBlock = 0L // Underlying vector for unrolling the block @@ -325,6 +326,12 @@ private[spark] class MemoryStore( // Whether there is still enough memory for us to continue unrolling this block var keepUnrolling = true + // Number of elements unrolled so far + var elementsUnrolled = 0L + // How often to check whether we need to request more memory + val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD) + // Memory to request as a multiple of current bbos size + val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR) // Initial per-task memory to request for unrolling blocks (bytes). val initialMemoryThreshold = unrollMemoryThreshold // Keep track of unroll memory used by this particular block / putIterator() operation @@ -359,7 +366,7 @@ private[spark] class MemoryStore( def reserveAdditionalMemoryIfNecessary(): Unit = { if (bbos.size > unrollMemoryUsedByThisBlock) { - val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock + val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) if (keepUnrolling) { unrollMemoryUsedByThisBlock += amountToRequest @@ -370,7 +377,10 @@ private[spark] class MemoryStore( // Unroll this block safely, checking whether we have exceeded our threshold while (values.hasNext && keepUnrolling) { serializationStream.writeObject(values.next())(classTag) - reserveAdditionalMemoryIfNecessary() + elementsUnrolled += 1 + if (elementsUnrolled % memoryCheckPeriod == 0) { + reserveAdditionalMemoryIfNecessary() + } } // Make sure that we have enough memory to store the block. By this point, it is possible that -- GitLab