Skip to content
Snippets Groups Projects
Commit 97678ede authored by Andrew Or's avatar Andrew Or
Browse files

[SPARK-12390] Clean up unused serializer parameter in BlockManager

No change in functionality is intended. This only changes internal API.

Author: Andrew Or <andrew@databricks.com>

Closes #10343 from andrewor14/clean-bm-serializer.
parent d1508dd9
No related branches found
No related tags found
No related merge requests found
...@@ -1190,20 +1190,16 @@ private[spark] class BlockManager( ...@@ -1190,20 +1190,16 @@ private[spark] class BlockManager(
def dataSerializeStream( def dataSerializeStream(
blockId: BlockId, blockId: BlockId,
outputStream: OutputStream, outputStream: OutputStream,
values: Iterator[Any], values: Iterator[Any]): Unit = {
serializer: Serializer = defaultSerializer): Unit = {
val byteStream = new BufferedOutputStream(outputStream) val byteStream = new BufferedOutputStream(outputStream)
val ser = serializer.newInstance() val ser = defaultSerializer.newInstance()
ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
} }
/** Serializes into a byte buffer. */ /** Serializes into a byte buffer. */
def dataSerialize( def dataSerialize(blockId: BlockId, values: Iterator[Any]): ByteBuffer = {
blockId: BlockId,
values: Iterator[Any],
serializer: Serializer = defaultSerializer): ByteBuffer = {
val byteStream = new ByteBufferOutputStream(4096) val byteStream = new ByteBufferOutputStream(4096)
dataSerializeStream(blockId, byteStream, values, serializer) dataSerializeStream(blockId, byteStream, values)
byteStream.toByteBuffer byteStream.toByteBuffer
} }
...@@ -1211,24 +1207,21 @@ private[spark] class BlockManager( ...@@ -1211,24 +1207,21 @@ private[spark] class BlockManager(
* Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of
* the iterator is reached. * the iterator is reached.
*/ */
def dataDeserialize( def dataDeserialize(blockId: BlockId, bytes: ByteBuffer): Iterator[Any] = {
blockId: BlockId,
bytes: ByteBuffer,
serializer: Serializer = defaultSerializer): Iterator[Any] = {
bytes.rewind() bytes.rewind()
dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true), serializer) dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true))
} }
/** /**
* Deserializes a InputStream into an iterator of values and disposes of it when the end of * Deserializes a InputStream into an iterator of values and disposes of it when the end of
* the iterator is reached. * the iterator is reached.
*/ */
def dataDeserializeStream( def dataDeserializeStream(blockId: BlockId, inputStream: InputStream): Iterator[Any] = {
blockId: BlockId,
inputStream: InputStream,
serializer: Serializer = defaultSerializer): Iterator[Any] = {
val stream = new BufferedInputStream(inputStream) val stream = new BufferedInputStream(inputStream)
serializer.newInstance().deserializeStream(wrapForCompression(blockId, stream)).asIterator defaultSerializer
.newInstance()
.deserializeStream(wrapForCompression(blockId, stream))
.asIterator
} }
def stop(): Unit = { def stop(): Unit = {
......
...@@ -144,16 +144,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc ...@@ -144,16 +144,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer)) getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
} }
/**
* A version of getValues that allows a custom serializer. This is used as part of the
* shuffle short-circuit code.
*/
def getValues(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
// TODO: Should bypass getBytes and use a stream based implementation, so that
// we won't use a lot of memory during e.g. external sort merge.
getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer))
}
override def remove(blockId: BlockId): Boolean = { override def remove(blockId: BlockId): Boolean = {
val file = diskManager.getFile(blockId.name) val file = diskManager.getFile(blockId.name)
if (file.exists()) { if (file.exists()) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment