diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index e8ac2999c2d29569507ccdb6e66966c66e70e91c..5af7ed5d6eb6d3ee7329b09ec4ada5afd0c1ad4f 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -17,8 +17,7 @@ package org.apache.spark.sql.catalyst.expressions; -import java.io.IOException; -import java.io.OutputStream; +import java.io.*; import java.math.BigDecimal; import java.math.BigInteger; import java.util.Arrays; @@ -26,6 +25,11 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + import org.apache.spark.sql.types.*; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.ByteArrayMethods; @@ -35,6 +39,7 @@ import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; import static org.apache.spark.sql.types.DataTypes.*; +import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET; /** * An Unsafe implementation of Row which is backed by raw memory instead of Java objects. @@ -52,7 +57,7 @@ import static org.apache.spark.sql.types.DataTypes.*; * * Instances of `UnsafeRow` act as pointers to row data stored in this format. */ -public final class UnsafeRow extends MutableRow { +public final class UnsafeRow extends MutableRow implements Externalizable, KryoSerializable { ////////////////////////////////////////////////////////////////////////////// // Static methods @@ -596,4 +601,40 @@ public final class UnsafeRow extends MutableRow { public void writeToMemory(Object target, long targetOffset) { Platform.copyMemory(baseObject, baseOffset, target, targetOffset, sizeInBytes); } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + byte[] bytes = getBytes(); + out.writeInt(bytes.length); + out.writeInt(this.numFields); + out.write(bytes); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + this.baseOffset = BYTE_ARRAY_OFFSET; + this.sizeInBytes = in.readInt(); + this.numFields = in.readInt(); + this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields); + this.baseObject = new byte[sizeInBytes]; + in.readFully((byte[]) baseObject); + } + + @Override + public void write(Kryo kryo, Output out) { + byte[] bytes = getBytes(); + out.writeInt(bytes.length); + out.writeInt(this.numFields); + out.write(bytes); + } + + @Override + public void read(Kryo kryo, Input in) { + this.baseOffset = BYTE_ARRAY_OFFSET; + this.sizeInBytes = in.readInt(); + this.numFields = in.readInt(); + this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields); + this.baseObject = new byte[sizeInBytes]; + in.read((byte[]) baseObject); + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala index 944d4e11348cf1ab00f38a84b66c8da1e22affd7..7d1ee39d4b5396630ba5c1ecc19a5d2919e11653 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql import java.io.ByteArrayOutputStream -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.serializer.{KryoSerializer, JavaSerializer} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, UnsafeProjection} import org.apache.spark.sql.types._ @@ -29,6 +30,32 @@ import org.apache.spark.unsafe.types.UTF8String class UnsafeRowSuite extends SparkFunSuite { + test("UnsafeRow Java serialization") { + // serializing an UnsafeRow pointing to a large buffer should only serialize the relevant data + val data = new Array[Byte](1024) + val row = new UnsafeRow + row.pointTo(data, 1, 16) + row.setLong(0, 19285) + + val ser = new JavaSerializer(new SparkConf).newInstance() + val row1 = ser.deserialize[UnsafeRow](ser.serialize(row)) + assert(row1.getLong(0) == 19285) + assert(row1.getBaseObject().asInstanceOf[Array[Byte]].length == 16) + } + + test("UnsafeRow Kryo serialization") { + // serializing an UnsafeRow pointing to a large buffer should only serialize the relevant data + val data = new Array[Byte](1024) + val row = new UnsafeRow + row.pointTo(data, 1, 16) + row.setLong(0, 19285) + + val ser = new KryoSerializer(new SparkConf).newInstance() + val row1 = ser.deserialize[UnsafeRow](ser.serialize(row)) + assert(row1.getLong(0) == 19285) + assert(row1.getBaseObject().asInstanceOf[Array[Byte]].length == 16) + } + test("bitset width calculation") { assert(UnsafeRow.calculateBitSetWidthInBytes(0) === 0) assert(UnsafeRow.calculateBitSetWidthInBytes(1) === 8)