Skip to content
Snippets Groups Projects
Commit 84ea2871 authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-10914] UnsafeRow serialization breaks when two machines have different Oops size.

UnsafeRow contains 3 pieces of information when pointing to some data in memory (an object, a base offset, and length). When the row is serialized with Java/Kryo serialization, the object layout in memory can change if two machines have different pointer width (Oops in JVM).

To reproduce, launch Spark using

MASTER=local-cluster[2,1,1024] bin/spark-shell --conf "spark.executor.extraJavaOptions=-XX:-UseCompressedOops"

And then run the following

scala> sql("select 1 xx").collect()

Author: Reynold Xin <rxin@databricks.com>

Closes #9030 from rxin/SPARK-10914.
parent 02149ff0
No related branches found
No related tags found
No related merge requests found
......@@ -17,8 +17,7 @@
package org.apache.spark.sql.catalyst.expressions;
import java.io.IOException;
import java.io.OutputStream;
import java.io.*;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Arrays;
......@@ -26,6 +25,11 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
......@@ -35,6 +39,7 @@ import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
import static org.apache.spark.sql.types.DataTypes.*;
import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;
/**
* An Unsafe implementation of Row which is backed by raw memory instead of Java objects.
......@@ -52,7 +57,7 @@ import static org.apache.spark.sql.types.DataTypes.*;
*
* Instances of `UnsafeRow` act as pointers to row data stored in this format.
*/
public final class UnsafeRow extends MutableRow {
public final class UnsafeRow extends MutableRow implements Externalizable, KryoSerializable {
//////////////////////////////////////////////////////////////////////////////
// Static methods
......@@ -596,4 +601,40 @@ public final class UnsafeRow extends MutableRow {
public void writeToMemory(Object target, long targetOffset) {
Platform.copyMemory(baseObject, baseOffset, target, targetOffset, sizeInBytes);
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
byte[] bytes = getBytes();
out.writeInt(bytes.length);
out.writeInt(this.numFields);
out.write(bytes);
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
this.baseOffset = BYTE_ARRAY_OFFSET;
this.sizeInBytes = in.readInt();
this.numFields = in.readInt();
this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
this.baseObject = new byte[sizeInBytes];
in.readFully((byte[]) baseObject);
}
@Override
public void write(Kryo kryo, Output out) {
byte[] bytes = getBytes();
out.writeInt(bytes.length);
out.writeInt(this.numFields);
out.write(bytes);
}
@Override
public void read(Kryo kryo, Input in) {
this.baseOffset = BYTE_ARRAY_OFFSET;
this.sizeInBytes = in.readInt();
this.numFields = in.readInt();
this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
this.baseObject = new byte[sizeInBytes];
in.read((byte[]) baseObject);
}
}
......@@ -19,7 +19,8 @@ package org.apache.spark.sql
import java.io.ByteArrayOutputStream
import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.serializer.{KryoSerializer, JavaSerializer}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, UnsafeProjection}
import org.apache.spark.sql.types._
......@@ -29,6 +30,32 @@ import org.apache.spark.unsafe.types.UTF8String
class UnsafeRowSuite extends SparkFunSuite {
test("UnsafeRow Java serialization") {
// serializing an UnsafeRow pointing to a large buffer should only serialize the relevant data
val data = new Array[Byte](1024)
val row = new UnsafeRow
row.pointTo(data, 1, 16)
row.setLong(0, 19285)
val ser = new JavaSerializer(new SparkConf).newInstance()
val row1 = ser.deserialize[UnsafeRow](ser.serialize(row))
assert(row1.getLong(0) == 19285)
assert(row1.getBaseObject().asInstanceOf[Array[Byte]].length == 16)
}
test("UnsafeRow Kryo serialization") {
// serializing an UnsafeRow pointing to a large buffer should only serialize the relevant data
val data = new Array[Byte](1024)
val row = new UnsafeRow
row.pointTo(data, 1, 16)
row.setLong(0, 19285)
val ser = new KryoSerializer(new SparkConf).newInstance()
val row1 = ser.deserialize[UnsafeRow](ser.serialize(row))
assert(row1.getLong(0) == 19285)
assert(row1.getBaseObject().asInstanceOf[Array[Byte]].length == 16)
}
test("bitset width calculation") {
assert(UnsafeRow.calculateBitSetWidthInBytes(0) === 0)
assert(UnsafeRow.calculateBitSetWidthInBytes(1) === 8)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment