diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index ab97d2e4b8b78c0b5fff069f08817d477ddc522b..5b493f470b50a6923cdd1763f11f4a0536fee4e0 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -203,8 +203,7 @@ private[spark] class DiskBlockObjectWriter( numRecordsWritten += 1 writeMetrics.incRecordsWritten(1) - // TODO: call updateBytesWritten() less frequently. - if (numRecordsWritten % 32 == 0) { + if (numRecordsWritten % 16384 == 0) { updateBytesWritten() } } diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala index 8eff3c297035d0e82aff5d042d3b281b7b4fb52e..ec4ef4b2fcbf0b7a9142edda02f69bd8e90fce3d 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala @@ -53,13 +53,13 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { assert(writeMetrics.recordsWritten === 1) // Metrics don't update on every write assert(writeMetrics.bytesWritten == 0) - // After 32 writes, metrics should update - for (i <- 0 until 32) { + // After 16384 writes, metrics should update + for (i <- 0 until 16384) { writer.flush() writer.write(Long.box(i), Long.box(i)) } assert(writeMetrics.bytesWritten > 0) - assert(writeMetrics.recordsWritten === 33) + assert(writeMetrics.recordsWritten === 16385) writer.commitAndClose() assert(file.length() == writeMetrics.bytesWritten) } @@ -75,13 +75,13 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { assert(writeMetrics.recordsWritten === 1) // Metrics don't update on every write assert(writeMetrics.bytesWritten == 0) - // After 32 writes, metrics should update - for (i <- 0 until 32) { + // After 16384 writes, metrics should update + for (i <- 0 until 16384) { writer.flush() writer.write(Long.box(i), Long.box(i)) } assert(writeMetrics.bytesWritten > 0) - assert(writeMetrics.recordsWritten === 33) + assert(writeMetrics.recordsWritten === 16385) writer.revertPartialWritesAndClose() assert(writeMetrics.bytesWritten == 0) assert(writeMetrics.recordsWritten == 0)