Skip to content
Snippets Groups Projects
Commit 0092abb4 authored by Sandy Ryza's avatar Sandy Ryza Committed by Josh Rosen
Browse files

Some minor cleanup after SPARK-4550.

JoshRosen this PR addresses the comments you left on #4450 after it got merged.

Author: Sandy Ryza <sandy@cloudera.com>

Closes #5916 from sryza/sandy-spark-4550-cleanup and squashes the following commits:

dee3d85 [Sandy Ryza] Some minor cleanup after SPARK-4550.
parent c688e3c5
No related branches found
No related tags found
No related merge requests found
...@@ -59,7 +59,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) extends Ou ...@@ -59,7 +59,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) extends Ou
def write(key: Any, value: Any) def write(key: Any, value: Any)
/** /**
* Notify the writer that a record worth of bytes has been written with writeBytes. * Notify the writer that a record worth of bytes has been written with OutputStream#write.
*/ */
def recordWritten() def recordWritten()
...@@ -215,12 +215,7 @@ private[spark] class DiskBlockObjectWriter( ...@@ -215,12 +215,7 @@ private[spark] class DiskBlockObjectWriter(
objOut.writeKey(key) objOut.writeKey(key)
objOut.writeValue(value) objOut.writeValue(value)
numRecordsWritten += 1 recordWritten()
writeMetrics.incShuffleRecordsWritten(1)
if (numRecordsWritten % 32 == 0) {
updateBytesWritten()
}
} }
override def write(b: Int): Unit = throw new UnsupportedOperationException() override def write(b: Int): Unit = throw new UnsupportedOperationException()
......
...@@ -71,10 +71,10 @@ private[spark] class PartitionedSerializedPairBuffer[K, V]( ...@@ -71,10 +71,10 @@ private[spark] class PartitionedSerializedPairBuffer[K, V](
if (keyStart < 0) { if (keyStart < 0) {
throw new Exception(s"Can't grow buffer beyond ${1 << 31} bytes") throw new Exception(s"Can't grow buffer beyond ${1 << 31} bytes")
} }
kvSerializationStream.writeObject[Any](key) kvSerializationStream.writeKey[Any](key)
kvSerializationStream.flush() kvSerializationStream.flush()
val valueStart = kvBuffer.size val valueStart = kvBuffer.size
kvSerializationStream.writeObject[Any](value) kvSerializationStream.writeValue[Any](value)
kvSerializationStream.flush() kvSerializationStream.flush()
val valueEnd = kvBuffer.size val valueEnd = kvBuffer.size
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment