Skip to content
Snippets Groups Projects
Commit d8ccf655 authored by Kay Ousterhout's avatar Kay Ousterhout Committed by Andrew Or
Browse files

[SPARK-3570] Include time to open files in shuffle write time.

Opening shuffle files can be very significant when the disk is
contended, especially when using ext3. While writing data to
a file can avoid hitting disk (and instead hit the buffer
cache), opening a file always involves writing some metadata
about the file to disk, so the open time can be a very significant
portion of the shuffle write time. In one job I ran recently, the time to
write shuffle data to the file was only 4ms for each task, but
the time to open the file was about 100x as long (~400ms).

When we add metrics about spilled data (#2504), we should ensure
that the file open time is also included there.

Author: Kay Ousterhout <kayousterhout@gmail.com>

Closes #4550 from kayousterhout/SPARK-3570 and squashes the following commits:

ea3a4ae [Kay Ousterhout] Added comment about excluded open time
fdc5185 [Kay Ousterhout] Improved comment
42b7e43 [Kay Ousterhout] Fixed parens for nanotime
2423555 [Kay Ousterhout] [SPARK-3570] Include time to open files in shuffle write time.
parent 6948ab6f
No related branches found
No related tags found
No related merge requests found
......@@ -112,6 +112,7 @@ class FileShuffleBlockManager(conf: SparkConf)
private val shuffleState = shuffleStates(shuffleId)
private var fileGroup: ShuffleFileGroup = null
val openStartTime = System.nanoTime
val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
fileGroup = getUnusedFileGroup()
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
......@@ -135,6 +136,9 @@ class FileShuffleBlockManager(conf: SparkConf)
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics)
}
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, so should be included in the shuffle write time.
writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
override def releaseWriters(success: Boolean) {
if (consolidateShuffleFiles) {
......
......@@ -63,6 +63,9 @@ private[spark] class SortShuffleWriter[K, V, C](
sorter.insertAll(records)
}
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId)
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
......
......@@ -352,6 +352,7 @@ private[spark] class ExternalSorter[K, V, C](
// Create our file writers if we haven't done so yet
if (partitionWriters == null) {
curWriteMetrics = new ShuffleWriteMetrics()
val openStartTime = System.nanoTime
partitionWriters = Array.fill(numPartitions) {
// Because these files may be read during shuffle, their compression must be controlled by
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
......@@ -359,6 +360,10 @@ private[spark] class ExternalSorter[K, V, C](
val (blockId, file) = diskBlockManager.createTempShuffleBlock()
blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics).open()
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
curWriteMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
}
// No need to sort stuff, just write each element out
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment