Skip to content
Snippets Groups Projects
Commit 534a063f authored by Xianyang Liu's avatar Xianyang Liu Committed by Wenchen Fan
Browse files

[SPARK-21621][CORE] Reset numRecordsWritten after DiskBlockObjectWriter.commitAndGet called

## What changes were proposed in this pull request?

We should reset numRecordsWritten to zero after DiskBlockObjectWriter.commitAndGet called.
Because when `revertPartialWritesAndClose` be called, we decrease the written records in `ShuffleWriteMetrics` . However, we decreased the written records to zero, this should be wrong, we should only decreased the number reords after the last `commitAndGet` called.

## How was this patch tested?
Modified existing test.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Xianyang Liu <xianyang.liu@intel.com>

Closes #18830 from ConeyLiu/DiskBlockObjectWriter.
parent 39e044e3
No related branches found
No related tags found
No related merge requests found
...@@ -95,6 +95,7 @@ private[spark] class DiskBlockObjectWriter( ...@@ -95,6 +95,7 @@ private[spark] class DiskBlockObjectWriter(
/** /**
* Keep track of number of records written and also use this to periodically * Keep track of number of records written and also use this to periodically
* output bytes written since the latter is expensive to do for each record. * output bytes written since the latter is expensive to do for each record.
* And we reset it after every commitAndGet called.
*/ */
private var numRecordsWritten = 0 private var numRecordsWritten = 0
...@@ -185,6 +186,7 @@ private[spark] class DiskBlockObjectWriter( ...@@ -185,6 +186,7 @@ private[spark] class DiskBlockObjectWriter(
// In certain compression codecs, more bytes are written after streams are closed // In certain compression codecs, more bytes are written after streams are closed
writeMetrics.incBytesWritten(committedPosition - reportedPosition) writeMetrics.incBytesWritten(committedPosition - reportedPosition)
reportedPosition = committedPosition reportedPosition = committedPosition
numRecordsWritten = 0
fileSegment fileSegment
} else { } else {
new FileSegment(file, committedPosition, 0) new FileSegment(file, committedPosition, 0)
......
...@@ -116,6 +116,7 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { ...@@ -116,6 +116,7 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
writer.revertPartialWritesAndClose() writer.revertPartialWritesAndClose()
assert(firstSegment.length === file.length()) assert(firstSegment.length === file.length())
assert(writeMetrics.bytesWritten === file.length()) assert(writeMetrics.bytesWritten === file.length())
assert(writeMetrics.recordsWritten == 1)
} }
test("calling revertPartialWritesAndClose() after commit() should have no effect") { test("calling revertPartialWritesAndClose() after commit() should have no effect") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment