diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala deleted file mode 100644 index c590f7c6c3e8bdfd281ce6f1374884846fd76d63..0000000000000000000000000000000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -import java.nio.ByteBuffer -import java.util.{HashMap => JavaHashMap} - -import scala.reflect.ClassTag - -import com.esotericsoftware.kryo.{Kryo, Serializer} -import com.esotericsoftware.kryo.io.{Input, Output} -import com.twitter.chill.ResourcePool - -import org.apache.spark.{SparkConf, SparkEnv} -import org.apache.spark.network.util.JavaUtils -import org.apache.spark.serializer.{KryoSerializer, SerializerInstance} -import org.apache.spark.sql.types.Decimal -import org.apache.spark.util.MutablePair - -private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) { - override def newKryo(): Kryo = { - val kryo = super.newKryo() - kryo.setRegistrationRequired(false) - kryo.register(classOf[MutablePair[_, _]]) - kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow]) - kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericInternalRow]) - kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow]) - kryo.register(classOf[java.math.BigDecimal], new JavaBigDecimalSerializer) - kryo.register(classOf[BigDecimal], new ScalaBigDecimalSerializer) - - kryo.register(classOf[Decimal]) - kryo.register(classOf[JavaHashMap[_, _]]) - - kryo.setReferences(false) - kryo - } -} - -private[execution] class KryoResourcePool(size: Int) - extends ResourcePool[SerializerInstance](size) { - - val ser: SparkSqlSerializer = { - val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) - new SparkSqlSerializer(sparkConf) - } - - def newInstance(): SerializerInstance = ser.newInstance() -} - -private[sql] object SparkSqlSerializer { - @transient lazy val resourcePool = new KryoResourcePool(30) - - private[this] def acquireRelease[O](fn: SerializerInstance => O): O = { - val kryo = resourcePool.borrow - try { - fn(kryo) - } finally { - resourcePool.release(kryo) - } - } - - def serialize[T: ClassTag](o: T): Array[Byte] = - acquireRelease { k => - JavaUtils.bufferToArray(k.serialize(o)) - } - - def deserialize[T: ClassTag](bytes: Array[Byte]): T = - acquireRelease { k => - k.deserialize[T](ByteBuffer.wrap(bytes)) - } -} - -private[sql] class JavaBigDecimalSerializer extends Serializer[java.math.BigDecimal] { - def write(kryo: Kryo, output: Output, bd: java.math.BigDecimal) { - // TODO: There are probably more efficient representations than strings... - output.writeString(bd.toString) - } - - def read(kryo: Kryo, input: Input, tpe: Class[java.math.BigDecimal]): java.math.BigDecimal = { - new java.math.BigDecimal(input.readString()) - } -} - -private[sql] class ScalaBigDecimalSerializer extends Serializer[BigDecimal] { - def write(kryo: Kryo, output: Output, bd: BigDecimal) { - // TODO: There are probably more efficient representations than strings... - output.writeString(bd.toString) - } - - def read(kryo: Kryo, input: Input, tpe: Class[BigDecimal]): BigDecimal = { - new java.math.BigDecimal(input.readString()) - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala index 4552eb6ce00a53f1c253bb7540d9117672aaf2c4..34936b38fb5d453b4ba0fffcb05beb6ee51bdcf7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, SpecificMutableRow} -import org.apache.spark.sql.execution.SparkSqlSerializer import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -55,15 +54,6 @@ class RowSuite extends SparkFunSuite with SharedSQLContext { assert(row.isNullAt(0)) } - test("serialize w/ kryo") { - val row = Seq((1, Seq(1), Map(1 -> 1), BigDecimal(1))).toDF().first() - val serializer = new SparkSqlSerializer(sparkContext.getConf) - val instance = serializer.newInstance() - val ser = instance.serialize(row) - val de = instance.deserialize(ser).asInstanceOf[Row] - assert(de === row) - } - test("get values by field name on Row created via .toDF") { val row = Seq((1, Seq(1))).toDF("a", "b").first() assert(row.getAs[Int]("a") === 1)