Skip to content
Snippets Groups Projects
Commit b9baa4cd authored by Volodymyr Lyubinets's avatar Volodymyr Lyubinets Committed by Michael Armbrust
Browse files

[SQL] [SPARK-6794] Use kryo-based SparkSqlSerializer for GeneralHashedRelation

Benchmarking results: http://pastie.org/private/1dneo1mta5zpsw6gmsoeq

Author: Volodymyr Lyubinets <vlyubin@gmail.com>

Closes #5433 from vlyubin/joins and squashes the following commits:

d70c829 [Volodymyr Lyubinets] Addressed review feedback
527eac6 [Volodymyr Lyubinets] Use kryo-based SparkSqlSerializer for GeneralHashedRelation
parent 9f5ed99d
No related branches found
No related tags found
No related merge requests found
......@@ -26,14 +26,13 @@ import scala.reflect.ClassTag
import com.clearspring.analytics.stream.cardinality.HyperLogLog
import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Serializer, Kryo}
import com.twitter.chill.{AllScalaRegistrar, ResourcePool}
import com.twitter.chill.ResourcePool
import org.apache.spark.{SparkEnv, SparkConf}
import org.apache.spark.serializer.{SerializerInstance, KryoSerializer}
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.util.collection.OpenHashSet
import org.apache.spark.util.MutablePair
import org.apache.spark.util.Utils
import org.apache.spark.sql.catalyst.expressions.codegen.{IntegerHashSet, LongHashSet}
......
......@@ -17,9 +17,11 @@
package org.apache.spark.sql.execution.joins
import java.io.{ObjectInput, ObjectOutput, Externalizable}
import java.util.{HashMap => JavaHashMap}
import org.apache.spark.sql.catalyst.expressions.{Projection, Row}
import org.apache.spark.sql.execution.SparkSqlSerializer
import org.apache.spark.util.collection.CompactBuffer
......@@ -29,16 +31,43 @@ import org.apache.spark.util.collection.CompactBuffer
*/
private[joins] sealed trait HashedRelation {
def get(key: Row): CompactBuffer[Row]
// This is a helper method to implement Externalizable, and is used by
// GeneralHashedRelation and UniqueKeyHashedRelation
protected def writeBytes(out: ObjectOutput, serialized: Array[Byte]): Unit = {
out.writeInt(serialized.length) // Write the length of serialized bytes first
out.write(serialized)
}
// This is a helper method to implement Externalizable, and is used by
// GeneralHashedRelation and UniqueKeyHashedRelation
protected def readBytes(in: ObjectInput): Array[Byte] = {
val serializedSize = in.readInt() // Read the length of serialized bytes first
val bytes = new Array[Byte](serializedSize)
in.readFully(bytes)
bytes
}
}
/**
* A general [[HashedRelation]] backed by a hash map that maps the key into a sequence of values.
*/
private[joins] final class GeneralHashedRelation(hashTable: JavaHashMap[Row, CompactBuffer[Row]])
extends HashedRelation with Serializable {
private[joins] final class GeneralHashedRelation(
private var hashTable: JavaHashMap[Row, CompactBuffer[Row]])
extends HashedRelation with Externalizable {
def this() = this(null) // Needed for serialization
override def get(key: Row): CompactBuffer[Row] = hashTable.get(key)
override def writeExternal(out: ObjectOutput): Unit = {
writeBytes(out, SparkSqlSerializer.serialize(hashTable))
}
override def readExternal(in: ObjectInput): Unit = {
hashTable = SparkSqlSerializer.deserialize(readBytes(in))
}
}
......@@ -46,8 +75,10 @@ private[joins] final class GeneralHashedRelation(hashTable: JavaHashMap[Row, Com
* A specialized [[HashedRelation]] that maps key into a single value. This implementation
* assumes the key is unique.
*/
private[joins] final class UniqueKeyHashedRelation(hashTable: JavaHashMap[Row, Row])
extends HashedRelation with Serializable {
private[joins] final class UniqueKeyHashedRelation(private var hashTable: JavaHashMap[Row, Row])
extends HashedRelation with Externalizable {
def this() = this(null) // Needed for serialization
override def get(key: Row): CompactBuffer[Row] = {
val v = hashTable.get(key)
......@@ -55,6 +86,14 @@ private[joins] final class UniqueKeyHashedRelation(hashTable: JavaHashMap[Row, R
}
def getValue(key: Row): Row = hashTable.get(key)
override def writeExternal(out: ObjectOutput): Unit = {
writeBytes(out, SparkSqlSerializer.serialize(hashTable))
}
override def readExternal(in: ObjectInput): Unit = {
hashTable = SparkSqlSerializer.deserialize(readBytes(in))
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment