Skip to content
Snippets Groups Projects
Commit 899ebcb1 authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-6578] Small rewrite to make the logic more clear in MessageWithHeader.transferTo.

Author: Reynold Xin <rxin@databricks.com>

Closes #5319 from rxin/SPARK-6578 and squashes the following commits:

7c62a64 [Reynold Xin] Small rewrite to make the logic more clear in transferTo.
parent 4815bc21
No related branches found
No related tags found
No related merge requests found
......@@ -21,15 +21,15 @@ import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import io.netty.buffer.ByteBuf;
import io.netty.channel.FileRegion;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCountUtil;
/**
* A wrapper message that holds two separate pieces (a header and a body) to avoid
* copying the body's content.
* A wrapper message that holds two separate pieces (a header and a body).
*
* The header must be a ByteBuf, while the body can be a ByteBuf or a FileRegion.
*/
class MessageWithHeader extends AbstractReferenceCounted implements FileRegion {
......@@ -63,32 +63,36 @@ class MessageWithHeader extends AbstractReferenceCounted implements FileRegion {
return totalBytesTransferred;
}
/**
* This code is more complicated than you would think because we might require multiple
* transferTo invocations in order to transfer a single MessageWithHeader to avoid busy waiting.
*
* The contract is that the caller will ensure position is properly set to the total number
* of bytes transferred so far (i.e. value returned by transfered()).
*/
@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
public long transferTo(final WritableByteChannel target, final long position) throws IOException {
Preconditions.checkArgument(position == totalBytesTransferred, "Invalid position.");
long written = 0;
if (position < headerLength) {
written += copyByteBuf(header, target);
// Bytes written for header in this call.
long writtenHeader = 0;
if (header.readableBytes() > 0) {
writtenHeader = copyByteBuf(header, target);
totalBytesTransferred += writtenHeader;
if (header.readableBytes() > 0) {
totalBytesTransferred += written;
return written;
return writtenHeader;
}
}
// Bytes written for body in this call.
long writtenBody = 0;
if (body instanceof FileRegion) {
// Adjust the position. If the write is happening as part of the same call where the header
// (or some part of it) is written, `position` will be less than the header size, so we want
// to start from position 0 in the FileRegion object. Otherwise, we start from the position
// requested by the caller.
long bodyPos = position > headerLength ? position - headerLength : 0;
written += ((FileRegion)body).transferTo(target, bodyPos);
writtenBody = ((FileRegion) body).transferTo(target, totalBytesTransferred - headerLength);
} else if (body instanceof ByteBuf) {
written += copyByteBuf((ByteBuf) body, target);
writtenBody = copyByteBuf((ByteBuf) body, target);
}
totalBytesTransferred += writtenBody;
totalBytesTransferred += written;
return written;
return writtenHeader + writtenBody;
}
@Override
......@@ -102,5 +106,4 @@ class MessageWithHeader extends AbstractReferenceCounted implements FileRegion {
buf.skipBytes(written);
return written;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment