From 574ef6c987c636210828e96d2f797d8f10aff05e Mon Sep 17 00:00:00 2001 From: zhoukang <zhoukang199191@gmail.com> Date: Fri, 25 Aug 2017 22:59:31 +0800 Subject: [PATCH] [SPARK-21527][CORE] Use buffer limit in order to use JAVA NIO Util's buffercache ## What changes were proposed in this pull request? Right now, ChunkedByteBuffer#writeFully do not slice bytes first.We observe code in java nio Util#getTemporaryDirectBuffer below: BufferCache cache = bufferCache.get(); ByteBuffer buf = cache.get(size); if (buf != null) { return buf; } else { // No suitable buffer in the cache so we need to allocate a new // one. To avoid the cache growing then we remove the first // buffer from the cache and free it. if (!cache.isEmpty()) { buf = cache.removeFirst(); free(buf); } return ByteBuffer.allocateDirect(size); } If we slice first with a fixed size, we can use buffer cache and only need to allocate at the first write call. Since we allocate new buffer, we can not control the free time of this buffer.This once cause memory issue in our production cluster. In this patch, i supply a new api which will slice with fixed size for buffer writing. ## How was this patch tested? Unit test and test in production. Author: zhoukang <zhoukang199191@gmail.com> Author: zhoukang <zhoukang@xiaomi.com> Closes #18730 from caneGuy/zhoukang/improve-chunkwrite. --- .../org/apache/spark/internal/config/package.scala | 9 +++++++++ .../org/apache/spark/util/io/ChunkedByteBuffer.scala | 11 ++++++++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 9495cd2835..0457a66af8 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -293,6 +293,15 @@ package object config { .booleanConf .createWithDefault(false) + private[spark] val BUFFER_WRITE_CHUNK_SIZE = + ConfigBuilder("spark.buffer.write.chunkSize") + .internal() + .doc("The chunk size during writing out the bytes of ChunkedByteBuffer.") + .bytesConf(ByteUnit.BYTE) + .checkValue(_ <= Int.MaxValue, "The chunk size during writing out the bytes of" + + " ChunkedByteBuffer should not larger than Int.MaxValue.") + .createWithDefault(64 * 1024 * 1024) + private[spark] val CHECKPOINT_COMPRESS = ConfigBuilder("spark.checkpoint.compress") .doc("Whether to compress RDD checkpoints. Generally a good idea. Compression will use " + diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index f48bfd5c25..c28570fb24 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -24,6 +24,8 @@ import java.nio.channels.WritableByteChannel import com.google.common.primitives.UnsignedBytes import io.netty.buffer.{ByteBuf, Unpooled} +import org.apache.spark.SparkEnv +import org.apache.spark.internal.config import org.apache.spark.network.util.ByteArrayWritableChannel import org.apache.spark.storage.StorageUtils @@ -40,6 +42,11 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { require(chunks != null, "chunks must not be null") require(chunks.forall(_.position() == 0), "chunks' positions must be 0") + // Chunk size in bytes + private val bufferWriteChunkSize = + Option(SparkEnv.get).map(_.conf.get(config.BUFFER_WRITE_CHUNK_SIZE)) + .getOrElse(config.BUFFER_WRITE_CHUNK_SIZE.defaultValue.get).toInt + private[this] var disposed: Boolean = false /** @@ -56,7 +63,9 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { */ def writeFully(channel: WritableByteChannel): Unit = { for (bytes <- getChunks()) { - while (bytes.remaining > 0) { + while (bytes.remaining() > 0) { + val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize) + bytes.limit(bytes.position + ioSize) channel.write(bytes) } } -- GitLab