Skip to content
Snippets Groups Projects
Commit 5503e453 authored by Reynold Xin's avatar Reynold Xin Committed by Davies Liu
Browse files

[SPARK-15088] [SQL] Remove SparkSqlSerializer

## What changes were proposed in this pull request?
This patch removes SparkSqlSerializer. I believe this is now dead code.

## How was this patch tested?
Removed a test case related to it.

Author: Reynold Xin <rxin@databricks.com>

Closes #12864 from rxin/SPARK-15088.
parent 8b6491fc
No related branches found
No related tags found
No related merge requests found
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution
import java.nio.ByteBuffer
import java.util.{HashMap => JavaHashMap}
import scala.reflect.ClassTag
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.esotericsoftware.kryo.io.{Input, Output}
import com.twitter.chill.ResourcePool
import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.{KryoSerializer, SerializerInstance}
import org.apache.spark.sql.types.Decimal
import org.apache.spark.util.MutablePair
private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
override def newKryo(): Kryo = {
val kryo = super.newKryo()
kryo.setRegistrationRequired(false)
kryo.register(classOf[MutablePair[_, _]])
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow])
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericInternalRow])
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow])
kryo.register(classOf[java.math.BigDecimal], new JavaBigDecimalSerializer)
kryo.register(classOf[BigDecimal], new ScalaBigDecimalSerializer)
kryo.register(classOf[Decimal])
kryo.register(classOf[JavaHashMap[_, _]])
kryo.setReferences(false)
kryo
}
}
private[execution] class KryoResourcePool(size: Int)
extends ResourcePool[SerializerInstance](size) {
val ser: SparkSqlSerializer = {
val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
new SparkSqlSerializer(sparkConf)
}
def newInstance(): SerializerInstance = ser.newInstance()
}
private[sql] object SparkSqlSerializer {
@transient lazy val resourcePool = new KryoResourcePool(30)
private[this] def acquireRelease[O](fn: SerializerInstance => O): O = {
val kryo = resourcePool.borrow
try {
fn(kryo)
} finally {
resourcePool.release(kryo)
}
}
def serialize[T: ClassTag](o: T): Array[Byte] =
acquireRelease { k =>
JavaUtils.bufferToArray(k.serialize(o))
}
def deserialize[T: ClassTag](bytes: Array[Byte]): T =
acquireRelease { k =>
k.deserialize[T](ByteBuffer.wrap(bytes))
}
}
private[sql] class JavaBigDecimalSerializer extends Serializer[java.math.BigDecimal] {
def write(kryo: Kryo, output: Output, bd: java.math.BigDecimal) {
// TODO: There are probably more efficient representations than strings...
output.writeString(bd.toString)
}
def read(kryo: Kryo, input: Input, tpe: Class[java.math.BigDecimal]): java.math.BigDecimal = {
new java.math.BigDecimal(input.readString())
}
}
private[sql] class ScalaBigDecimalSerializer extends Serializer[BigDecimal] {
def write(kryo: Kryo, output: Output, bd: BigDecimal) {
// TODO: There are probably more efficient representations than strings...
output.writeString(bd.toString)
}
def read(kryo: Kryo, input: Input, tpe: Class[BigDecimal]): BigDecimal = {
new java.math.BigDecimal(input.readString())
}
}
......@@ -19,7 +19,6 @@ package org.apache.spark.sql
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, SpecificMutableRow}
import org.apache.spark.sql.execution.SparkSqlSerializer
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
......@@ -55,15 +54,6 @@ class RowSuite extends SparkFunSuite with SharedSQLContext {
assert(row.isNullAt(0))
}
test("serialize w/ kryo") {
val row = Seq((1, Seq(1), Map(1 -> 1), BigDecimal(1))).toDF().first()
val serializer = new SparkSqlSerializer(sparkContext.getConf)
val instance = serializer.newInstance()
val ser = instance.serialize(row)
val de = instance.deserialize(ser).asInstanceOf[Row]
assert(de === row)
}
test("get values by field name on Row created via .toDF") {
val row = Seq((1, Seq(1))).toDF("a", "b").first()
assert(row.getAs[Int]("a") === 1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment