Skip to content
Snippets Groups Projects
Commit 8a1ce489 authored by Pete Robbins's avatar Pete Robbins Committed by Davies Liu
Browse files

[SPARK-13745] [SQL] Support columnar in memory representation on Big Endian platforms

## What changes were proposed in this pull request?

parquet datasource and ColumnarBatch tests fail on big-endian platforms This patch adds support for the little-endian byte arrays being correctly interpreted on a big-endian platform

## How was this patch tested?

Spark test builds ran on big endian z/Linux and regression build on little endian amd64

Author: Pete Robbins <robbinspg@gmail.com>

Closes #12397 from robbinspg/master.
parent 95e37214
No related branches found
No related tags found
No related merge requests found
......@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources.parquet;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.spark.sql.execution.vectorized.ColumnVector;
import org.apache.spark.unsafe.Platform;
......@@ -31,6 +33,9 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
private byte[] buffer;
private int offset;
private int bitOffset; // Only used for booleans.
private ByteBuffer byteBuffer; // used to wrap the byte array buffer
private final static boolean bigEndianPlatform = ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
public VectorizedPlainValuesReader() {
}
......@@ -39,6 +44,9 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
public void initFromPage(int valueCount, byte[] bytes, int offset) throws IOException {
this.buffer = bytes;
this.offset = offset + Platform.BYTE_ARRAY_OFFSET;
if (bigEndianPlatform) {
byteBuffer = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
}
}
@Override
......@@ -103,6 +111,9 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
@Override
public final int readInteger() {
int v = Platform.getInt(buffer, offset);
if (bigEndianPlatform) {
v = java.lang.Integer.reverseBytes(v);
}
offset += 4;
return v;
}
......@@ -110,6 +121,9 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
@Override
public final long readLong() {
long v = Platform.getLong(buffer, offset);
if (bigEndianPlatform) {
v = java.lang.Long.reverseBytes(v);
}
offset += 8;
return v;
}
......@@ -121,14 +135,24 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
@Override
public final float readFloat() {
float v = Platform.getFloat(buffer, offset);
float v;
if (!bigEndianPlatform) {
v = Platform.getFloat(buffer, offset);
} else {
v = byteBuffer.getFloat(offset - Platform.BYTE_ARRAY_OFFSET);
}
offset += 4;
return v;
}
@Override
public final double readDouble() {
double v = Platform.getDouble(buffer, offset);
double v;
if (!bigEndianPlatform) {
v = Platform.getDouble(buffer, offset);
} else {
v = byteBuffer.getDouble(offset - Platform.BYTE_ARRAY_OFFSET);
}
offset += 8;
return v;
}
......
......@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.vectorized;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.commons.lang.NotImplementedException;
......@@ -28,6 +29,9 @@ import org.apache.spark.unsafe.Platform;
* Column data backed using offheap memory.
*/
public final class OffHeapColumnVector extends ColumnVector {
private final static boolean bigEndianPlatform = ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
// The data stored in these two allocations need to maintain binary compatible. We can
// directly pass this buffer to external components.
private long nulls;
......@@ -39,9 +43,7 @@ public final class OffHeapColumnVector extends ColumnVector {
protected OffHeapColumnVector(int capacity, DataType type) {
super(capacity, type, MemoryMode.OFF_HEAP);
if (!ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN)) {
throw new NotImplementedException("Only little endian is supported.");
}
nulls = 0;
data = 0;
lengthData = 0;
......@@ -221,8 +223,16 @@ public final class OffHeapColumnVector extends ColumnVector {
@Override
public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
null, data + 4 * rowId, count * 4);
if (!bigEndianPlatform) {
Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
null, data + 4 * rowId, count * 4);
} else {
int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
long offset = data + 4 * rowId;
for (int i = 0; i < count; ++i, offset += 4, srcOffset += 4) {
Platform.putInt(null, offset, java.lang.Integer.reverseBytes(Platform.getInt(src, srcOffset)));
}
}
}
@Override
......@@ -259,8 +269,16 @@ public final class OffHeapColumnVector extends ColumnVector {
@Override
public void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
null, data + 8 * rowId, count * 8);
if (!bigEndianPlatform) {
Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
null, data + 8 * rowId, count * 8);
} else {
int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
long offset = data + 8 * rowId;
for (int i = 0; i < count; ++i, offset += 8, srcOffset += 8) {
Platform.putLong(null, offset, java.lang.Long.reverseBytes(Platform.getLong(src, srcOffset)));
}
}
}
@Override
......@@ -297,8 +315,16 @@ public final class OffHeapColumnVector extends ColumnVector {
@Override
public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
null, data + rowId * 4, count * 4);
if (!bigEndianPlatform) {
Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
null, data + rowId * 4, count * 4);
} else {
ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
long offset = data + 4 * rowId;
for (int i = 0; i < count; ++i, offset += 4) {
Platform.putFloat(null, offset, bb.getFloat(srcIndex + (4 * i)));
}
}
}
@Override
......@@ -336,8 +362,16 @@ public final class OffHeapColumnVector extends ColumnVector {
@Override
public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
if (!bigEndianPlatform) {
Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
null, data + rowId * 8, count * 8);
} else {
ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
long offset = data + 8 * rowId;
for (int i = 0; i < count; ++i, offset += 8) {
Platform.putDouble(null, offset, bb.getDouble(srcIndex + (8 * i)));
}
}
}
@Override
......
......@@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.execution.vectorized;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import org.apache.spark.memory.MemoryMode;
......@@ -27,6 +29,9 @@ import org.apache.spark.unsafe.Platform;
* and a java array for the values.
*/
public final class OnHeapColumnVector extends ColumnVector {
private final static boolean bigEndianPlatform = ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
// The data stored in these arrays need to maintain binary compatible. We can
// directly pass this buffer to external components.
......@@ -211,10 +216,11 @@ public final class OnHeapColumnVector extends ColumnVector {
@Override
public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
for (int i = 0; i < count; ++i) {
for (int i = 0; i < count; ++i, srcOffset += 4) {
intData[i + rowId] = Platform.getInt(src, srcOffset);
srcIndex += 4;
srcOffset += 4;
if (bigEndianPlatform) {
intData[i + rowId] = java.lang.Integer.reverseBytes(intData[i + rowId]);
}
}
}
......@@ -251,10 +257,11 @@ public final class OnHeapColumnVector extends ColumnVector {
@Override
public void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
for (int i = 0; i < count; ++i) {
for (int i = 0; i < count; ++i, srcOffset += 8) {
longData[i + rowId] = Platform.getLong(src, srcOffset);
srcIndex += 8;
srcOffset += 8;
if (bigEndianPlatform) {
longData[i + rowId] = java.lang.Long.reverseBytes(longData[i + rowId]);
}
}
}
......@@ -286,8 +293,15 @@ public final class OnHeapColumnVector extends ColumnVector {
@Override
public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
floatData, Platform.DOUBLE_ARRAY_OFFSET + rowId * 4, count * 4);
if (!bigEndianPlatform) {
Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, floatData,
Platform.DOUBLE_ARRAY_OFFSET + rowId * 4, count * 4);
} else {
ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
for (int i = 0; i < count; ++i) {
floatData[i + rowId] = bb.getFloat(srcIndex + (4 * i));
}
}
}
@Override
......@@ -320,8 +334,15 @@ public final class OnHeapColumnVector extends ColumnVector {
@Override
public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, doubleData,
Platform.DOUBLE_ARRAY_OFFSET + rowId * 8, count * 8);
if (!bigEndianPlatform) {
Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, doubleData,
Platform.DOUBLE_ARRAY_OFFSET + rowId * 8, count * 8);
} else {
ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
for (int i = 0; i < count; ++i) {
doubleData[i + rowId] = bb.getDouble(srcIndex + (8 * i));
}
}
}
@Override
......
......@@ -18,6 +18,8 @@
package org.apache.spark.sql.execution.vectorized
import java.nio.charset.StandardCharsets
import java.nio.ByteBuffer
import java.nio.ByteOrder
import scala.collection.JavaConverters._
import scala.collection.mutable
......@@ -280,6 +282,13 @@ class ColumnarBatchSuite extends SparkFunSuite {
Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, 2.234)
Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, 1.123)
if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
// Ensure array contains Liitle Endian doubles
var bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN)
Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, bb.getDouble(0))
Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, bb.getDouble(8))
}
column.putDoubles(idx, 1, buffer, 8)
column.putDoubles(idx + 1, 1, buffer, 0)
reference += 1.123
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment