diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 2fde5c300f072934b4659c3c893d658311ca8410..857ec8a4dadd23d7fe92977ff4ef39d8a7c7f94c 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -40,6 +40,7 @@ import org.apache.spark.annotation.Private; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.io.CompressionCodec; import org.apache.spark.io.CompressionCodec$; +import org.apache.spark.io.NioBufferedFileInputStream; import org.apache.commons.io.output.CloseShieldOutputStream; import org.apache.commons.io.output.CountingOutputStream; import org.apache.spark.memory.TaskMemoryManager; @@ -98,6 +99,18 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { */ private boolean stopping = false; + private class CloseAndFlushShieldOutputStream extends CloseShieldOutputStream { + + CloseAndFlushShieldOutputStream(OutputStream outputStream) { + super(outputStream); + } + + @Override + public void flush() { + // do nothing + } + } + public UnsafeShuffleWriter( BlockManager blockManager, IndexShuffleBlockResolver shuffleBlockResolver, @@ -321,11 +334,15 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { } /** - * Merges spill files using Java FileStreams. This code path is slower than the NIO-based merge, - * {@link UnsafeShuffleWriter#mergeSpillsWithTransferTo(SpillInfo[], File)}, so it's only used in - * cases where the IO compression codec does not support concatenation of compressed data, when - * encryption is enabled, or when users have explicitly disabled use of {@code transferTo} in - * order to work around kernel bugs. + * Merges spill files using Java FileStreams. This code path is typically slower than + * the NIO-based merge, {@link UnsafeShuffleWriter#mergeSpillsWithTransferTo(SpillInfo[], + * File)}, and it's mostly used in cases where the IO compression codec does not support + * concatenation of compressed data, when encryption is enabled, or when users have + * explicitly disabled use of {@code transferTo} in order to work around kernel bugs. + * This code path might also be faster in cases where individual partition size in a spill + * is small and UnsafeShuffleWriter#mergeSpillsWithTransferTo method performs many small + * disk ios which is inefficient. In those case, Using large buffers for input and output + * files helps reducing the number of disk ios, making the file merging faster. * * @param spills the spills to merge. * @param outputFile the file to write the merged data to. @@ -339,23 +356,28 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { assert (spills.length >= 2); final int numPartitions = partitioner.numPartitions(); final long[] partitionLengths = new long[numPartitions]; - final InputStream[] spillInputStreams = new FileInputStream[spills.length]; + final InputStream[] spillInputStreams = new InputStream[spills.length]; + final OutputStream bos = new BufferedOutputStream( + new FileOutputStream(outputFile), + (int) sparkConf.getSizeAsKb("spark.shuffle.unsafe.file.output.buffer", "32k") * 1024); // Use a counting output stream to avoid having to close the underlying file and ask // the file system for its size after each partition is written. - final CountingOutputStream mergedFileOutputStream = new CountingOutputStream( - new FileOutputStream(outputFile)); + final CountingOutputStream mergedFileOutputStream = new CountingOutputStream(bos); + final int inputBufferSizeInBytes = (int) sparkConf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; boolean threwException = true; try { for (int i = 0; i < spills.length; i++) { - spillInputStreams[i] = new FileInputStream(spills[i].file); + spillInputStreams[i] = new NioBufferedFileInputStream( + spills[i].file, + inputBufferSizeInBytes); } for (int partition = 0; partition < numPartitions; partition++) { final long initialFileLength = mergedFileOutputStream.getByteCount(); - // Shield the underlying output stream from close() calls, so that we can close the higher + // Shield the underlying output stream from close() and flush() calls, so that we can close the higher // level streams to make sure all data is really flushed and internal state is cleaned. - OutputStream partitionOutput = new CloseShieldOutputStream( + OutputStream partitionOutput = new CloseAndFlushShieldOutputStream( new TimeTrackingOutputStream(writeMetrics, mergedFileOutputStream)); partitionOutput = blockManager.serializerManager().wrapForEncryption(partitionOutput); if (compressionCodec != null) {