Skip to content
Snippets Groups Projects
Commit c1937dd1 authored by Tejas Patil's avatar Tejas Patil Committed by Reynold Xin
Browse files

[SPARK-16862] Configurable buffer size in `UnsafeSorterSpillReader`

## What changes were proposed in this pull request?

Jira: https://issues.apache.org/jira/browse/SPARK-16862

`BufferedInputStream` used in `UnsafeSorterSpillReader` uses the default 8k buffer to read data off disk. This PR makes it configurable to improve on disk reads. I have made the default value to be 1 MB as with that value I observed improved performance.

## How was this patch tested?

I am relying on the existing unit tests.

## Performance

After deploying this change to prod and setting the config to 1 mb, there was a 12% reduction in the CPU time and 19.5% reduction in CPU reservation time.

Author: Tejas Patil <tejasp@fb.com>

Closes #14726 from tejasapatil/spill_buffer_2.
parent bf8ff833
No related branches found
No related tags found
No related merge requests found
......@@ -22,15 +22,21 @@ import java.io.*;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import org.apache.spark.SparkEnv;
import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.storage.BlockId;
import org.apache.spark.unsafe.Platform;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Reads spill files written by {@link UnsafeSorterSpillWriter} (see that class for a description
* of the file format).
*/
public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(UnsafeSorterSpillReader.class);
private static final int DEFAULT_BUFFER_SIZE_BYTES = 1024 * 1024; // 1 MB
private static final int MAX_BUFFER_SIZE_BYTES = 16777216; // 16 mb
private InputStream in;
private DataInputStream din;
......@@ -50,7 +56,21 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen
File file,
BlockId blockId) throws IOException {
assert (file.length() > 0);
final BufferedInputStream bs = new BufferedInputStream(new FileInputStream(file));
long bufferSizeBytes =
SparkEnv.get() == null ?
DEFAULT_BUFFER_SIZE_BYTES:
SparkEnv.get().conf().getSizeAsBytes("spark.unsafe.sorter.spill.reader.buffer.size",
DEFAULT_BUFFER_SIZE_BYTES);
if (bufferSizeBytes > MAX_BUFFER_SIZE_BYTES || bufferSizeBytes < DEFAULT_BUFFER_SIZE_BYTES) {
// fall back to a sane default value
logger.warn("Value of config \"spark.unsafe.sorter.spill.reader.buffer.size\" = {} not in " +
"allowed range [{}, {}). Falling back to default value : {} bytes", bufferSizeBytes,
DEFAULT_BUFFER_SIZE_BYTES, MAX_BUFFER_SIZE_BYTES, DEFAULT_BUFFER_SIZE_BYTES);
bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES;
}
final BufferedInputStream bs =
new BufferedInputStream(new FileInputStream(file), (int) bufferSizeBytes);
try {
this.in = serializerManager.wrapForCompression(blockId, bs);
this.din = new DataInputStream(this.in);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment