Skip to content
Snippets Groups Projects
Commit 14a3bb3a authored by Sumedh Wale's avatar Sumedh Wale Committed by Wenchen Fan
Browse files

[SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream

## What changes were proposed in this pull request?

Corrects offsetInBytes calculation in UnsafeRow.writeToStream. Known failures include writes to some DataSources that have own SparkPlan implementations and cause EXCHANGE in writes.

## How was this patch tested?

Extended UnsafeRowSuite.writeToStream to include an UnsafeRow over byte array having non-zero offset.

Author: Sumedh Wale <swale@snappydata.io>

Closes #18535 from sumwale/SPARK-21312.
parent 75b168fd
No related branches found
No related tags found
No related merge requests found
...@@ -550,7 +550,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo ...@@ -550,7 +550,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo
*/ */
public void writeToStream(OutputStream out, byte[] writeBuffer) throws IOException { public void writeToStream(OutputStream out, byte[] writeBuffer) throws IOException {
if (baseObject instanceof byte[]) { if (baseObject instanceof byte[]) {
int offsetInByteArray = (int) (Platform.BYTE_ARRAY_OFFSET - baseOffset); int offsetInByteArray = (int) (baseOffset - Platform.BYTE_ARRAY_OFFSET);
out.write((byte[]) baseObject, offsetInByteArray, sizeInBytes); out.write((byte[]) baseObject, offsetInByteArray, sizeInBytes);
} else { } else {
int dataRemaining = sizeInBytes; int dataRemaining = sizeInBytes;
......
...@@ -101,9 +101,22 @@ class UnsafeRowSuite extends SparkFunSuite { ...@@ -101,9 +101,22 @@ class UnsafeRowSuite extends SparkFunSuite {
MemoryAllocator.UNSAFE.free(offheapRowPage) MemoryAllocator.UNSAFE.free(offheapRowPage)
} }
} }
val (bytesFromArrayBackedRowWithOffset, field0StringFromArrayBackedRowWithOffset) = {
val baos = new ByteArrayOutputStream()
val numBytes = arrayBackedUnsafeRow.getSizeInBytes
val bytesWithOffset = new Array[Byte](numBytes + 100)
System.arraycopy(arrayBackedUnsafeRow.getBaseObject.asInstanceOf[Array[Byte]], 0,
bytesWithOffset, 100, numBytes)
val arrayBackedRow = new UnsafeRow(arrayBackedUnsafeRow.numFields())
arrayBackedRow.pointTo(bytesWithOffset, Platform.BYTE_ARRAY_OFFSET + 100, numBytes)
arrayBackedRow.writeToStream(baos, null)
(baos.toByteArray, arrayBackedRow.getString(0))
}
assert(bytesFromArrayBackedRow === bytesFromOffheapRow) assert(bytesFromArrayBackedRow === bytesFromOffheapRow)
assert(field0StringFromArrayBackedRow === field0StringFromOffheapRow) assert(field0StringFromArrayBackedRow === field0StringFromOffheapRow)
assert(bytesFromArrayBackedRow === bytesFromArrayBackedRowWithOffset)
assert(field0StringFromArrayBackedRow === field0StringFromArrayBackedRowWithOffset)
} }
test("calling getDouble() and getFloat() on null columns") { test("calling getDouble() and getFloat() on null columns") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment