Skip to content
Snippets Groups Projects
Commit e11aa9ec authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-14452][SQL] Explicit APIs in Scala for specifying encoders

## What changes were proposed in this pull request?
The Scala Dataset public API currently only allows users to specify encoders through SQLContext.implicits. This is OK but sometimes people want to explicitly get encoders without a SQLContext (e.g. Aggregator implementations). This patch adds public APIs to Encoders class for getting Scala encoders.

## How was this patch tested?
None - I will update test cases once https://github.com/apache/spark/pull/12231 is merged.

Author: Reynold Xin <rxin@databricks.com>

Closes #12232 from rxin/SPARK-14452.
parent 21d5ca12
No related branches found
No related tags found
No related merge requests found
...@@ -17,22 +17,20 @@ ...@@ -17,22 +17,20 @@
package org.apache.spark.sql package org.apache.spark.sql
import java.lang.reflect.Modifier
import scala.annotation.implicitNotFound import scala.annotation.implicitNotFound
import scala.reflect.{classTag, ClassTag} import scala.reflect.ClassTag
import org.apache.spark.annotation.Experimental import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
import org.apache.spark.sql.catalyst.expressions.{BoundReference, DecodeUsingSerializer, EncodeUsingSerializer}
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
/** /**
* :: Experimental :: * :: Experimental ::
* Used to convert a JVM object of type `T` to and from the internal Spark SQL representation. * Used to convert a JVM object of type `T` to and from the internal Spark SQL representation.
* *
* == Scala == * == Scala ==
* Encoders are generally created automatically through implicits from a `SQLContext`. * Encoders are generally created automatically through implicits from a `SQLContext`, or can be
* explicitly created by calling static methods on [[Encoders]].
* *
* {{{ * {{{
* import sqlContext.implicits._ * import sqlContext.implicits._
...@@ -81,224 +79,3 @@ trait Encoder[T] extends Serializable { ...@@ -81,224 +79,3 @@ trait Encoder[T] extends Serializable {
/** A ClassTag that can be used to construct and Array to contain a collection of `T`. */ /** A ClassTag that can be used to construct and Array to contain a collection of `T`. */
def clsTag: ClassTag[T] def clsTag: ClassTag[T]
} }
/**
* :: Experimental ::
* Methods for creating an [[Encoder]].
*
* @since 1.6.0
*/
@Experimental
object Encoders {
/**
* An encoder for nullable boolean type.
* @since 1.6.0
*/
def BOOLEAN: Encoder[java.lang.Boolean] = ExpressionEncoder()
/**
* An encoder for nullable byte type.
* @since 1.6.0
*/
def BYTE: Encoder[java.lang.Byte] = ExpressionEncoder()
/**
* An encoder for nullable short type.
* @since 1.6.0
*/
def SHORT: Encoder[java.lang.Short] = ExpressionEncoder()
/**
* An encoder for nullable int type.
* @since 1.6.0
*/
def INT: Encoder[java.lang.Integer] = ExpressionEncoder()
/**
* An encoder for nullable long type.
* @since 1.6.0
*/
def LONG: Encoder[java.lang.Long] = ExpressionEncoder()
/**
* An encoder for nullable float type.
* @since 1.6.0
*/
def FLOAT: Encoder[java.lang.Float] = ExpressionEncoder()
/**
* An encoder for nullable double type.
* @since 1.6.0
*/
def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder()
/**
* An encoder for nullable string type.
* @since 1.6.0
*/
def STRING: Encoder[java.lang.String] = ExpressionEncoder()
/**
* An encoder for nullable decimal type.
* @since 1.6.0
*/
def DECIMAL: Encoder[java.math.BigDecimal] = ExpressionEncoder()
/**
* An encoder for nullable date type.
* @since 1.6.0
*/
def DATE: Encoder[java.sql.Date] = ExpressionEncoder()
/**
* An encoder for nullable timestamp type.
* @since 1.6.0
*/
def TIMESTAMP: Encoder[java.sql.Timestamp] = ExpressionEncoder()
/**
* An encoder for arrays of bytes.
* @since 1.6.1
*/
def BINARY: Encoder[Array[Byte]] = ExpressionEncoder()
/**
* Creates an encoder for Java Bean of type T.
*
* T must be publicly accessible.
*
* supported types for java bean field:
* - primitive types: boolean, int, double, etc.
* - boxed types: Boolean, Integer, Double, etc.
* - String
* - java.math.BigDecimal
* - time related: java.sql.Date, java.sql.Timestamp
* - collection types: only array and java.util.List currently, map support is in progress
* - nested java bean.
*
* @since 1.6.0
*/
def bean[T](beanClass: Class[T]): Encoder[T] = ExpressionEncoder.javaBean(beanClass)
/**
* (Scala-specific) Creates an encoder that serializes objects of type T using Kryo.
* This encoder maps T into a single byte array (binary) field.
*
* T must be publicly accessible.
*
* @since 1.6.0
*/
def kryo[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = true)
/**
* Creates an encoder that serializes objects of type T using Kryo.
* This encoder maps T into a single byte array (binary) field.
*
* T must be publicly accessible.
*
* @since 1.6.0
*/
def kryo[T](clazz: Class[T]): Encoder[T] = kryo(ClassTag[T](clazz))
/**
* (Scala-specific) Creates an encoder that serializes objects of type T using generic Java
* serialization. This encoder maps T into a single byte array (binary) field.
*
* Note that this is extremely inefficient and should only be used as the last resort.
*
* T must be publicly accessible.
*
* @since 1.6.0
*/
def javaSerialization[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = false)
/**
* Creates an encoder that serializes objects of type T using generic Java serialization.
* This encoder maps T into a single byte array (binary) field.
*
* Note that this is extremely inefficient and should only be used as the last resort.
*
* T must be publicly accessible.
*
* @since 1.6.0
*/
def javaSerialization[T](clazz: Class[T]): Encoder[T] = javaSerialization(ClassTag[T](clazz))
/** Throws an exception if T is not a public class. */
private def validatePublicClass[T: ClassTag](): Unit = {
if (!Modifier.isPublic(classTag[T].runtimeClass.getModifiers)) {
throw new UnsupportedOperationException(
s"${classTag[T].runtimeClass.getName} is not a public class. " +
"Only public classes are supported.")
}
}
/** A way to construct encoders using generic serializers. */
private def genericSerializer[T: ClassTag](useKryo: Boolean): Encoder[T] = {
if (classTag[T].runtimeClass.isPrimitive) {
throw new UnsupportedOperationException("Primitive types are not supported.")
}
validatePublicClass[T]()
ExpressionEncoder[T](
schema = new StructType().add("value", BinaryType),
flat = true,
serializer = Seq(
EncodeUsingSerializer(
BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)),
deserializer =
DecodeUsingSerializer[T](
BoundReference(0, BinaryType, nullable = true), classTag[T], kryo = useKryo),
clsTag = classTag[T]
)
}
/**
* An encoder for 2-ary tuples.
* @since 1.6.0
*/
def tuple[T1, T2](
e1: Encoder[T1],
e2: Encoder[T2]): Encoder[(T1, T2)] = {
ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2))
}
/**
* An encoder for 3-ary tuples.
* @since 1.6.0
*/
def tuple[T1, T2, T3](
e1: Encoder[T1],
e2: Encoder[T2],
e3: Encoder[T3]): Encoder[(T1, T2, T3)] = {
ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3))
}
/**
* An encoder for 4-ary tuples.
* @since 1.6.0
*/
def tuple[T1, T2, T3, T4](
e1: Encoder[T1],
e2: Encoder[T2],
e3: Encoder[T3],
e4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = {
ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3), encoderFor(e4))
}
/**
* An encoder for 5-ary tuples.
* @since 1.6.0
*/
def tuple[T1, T2, T3, T4, T5](
e1: Encoder[T1],
e2: Encoder[T2],
e3: Encoder[T3],
e4: Encoder[T4],
e5: Encoder[T5]): Encoder[(T1, T2, T3, T4, T5)] = {
ExpressionEncoder.tuple(
encoderFor(e1), encoderFor(e2), encoderFor(e3), encoderFor(e4), encoderFor(e5))
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import java.lang.reflect.Modifier
import scala.reflect.{classTag, ClassTag}
import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
import org.apache.spark.sql.catalyst.expressions.{BoundReference, DecodeUsingSerializer, EncodeUsingSerializer}
import org.apache.spark.sql.types._
/**
* :: Experimental ::
* Methods for creating an [[Encoder]].
*
* @since 1.6.0
*/
@Experimental
object Encoders {
/**
* An encoder for nullable boolean type.
* The Scala primitive encoder is available as [[scalaBoolean]].
* @since 1.6.0
*/
def BOOLEAN: Encoder[java.lang.Boolean] = ExpressionEncoder()
/**
* An encoder for nullable byte type.
* The Scala primitive encoder is available as [[scalaByte]].
* @since 1.6.0
*/
def BYTE: Encoder[java.lang.Byte] = ExpressionEncoder()
/**
* An encoder for nullable short type.
* The Scala primitive encoder is available as [[scalaShort]].
* @since 1.6.0
*/
def SHORT: Encoder[java.lang.Short] = ExpressionEncoder()
/**
* An encoder for nullable int type.
* The Scala primitive encoder is available as [[scalaInt]].
* @since 1.6.0
*/
def INT: Encoder[java.lang.Integer] = ExpressionEncoder()
/**
* An encoder for nullable long type.
* The Scala primitive encoder is available as [[scalaLong]].
* @since 1.6.0
*/
def LONG: Encoder[java.lang.Long] = ExpressionEncoder()
/**
* An encoder for nullable float type.
* The Scala primitive encoder is available as [[scalaFloat]].
* @since 1.6.0
*/
def FLOAT: Encoder[java.lang.Float] = ExpressionEncoder()
/**
* An encoder for nullable double type.
* The Scala primitive encoder is available as [[scalaDouble]].
* @since 1.6.0
*/
def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder()
/**
* An encoder for nullable string type.
*
* @since 1.6.0
*/
def STRING: Encoder[java.lang.String] = ExpressionEncoder()
/**
* An encoder for nullable decimal type.
*
* @since 1.6.0
*/
def DECIMAL: Encoder[java.math.BigDecimal] = ExpressionEncoder()
/**
* An encoder for nullable date type.
*
* @since 1.6.0
*/
def DATE: Encoder[java.sql.Date] = ExpressionEncoder()
/**
* An encoder for nullable timestamp type.
*
* @since 1.6.0
*/
def TIMESTAMP: Encoder[java.sql.Timestamp] = ExpressionEncoder()
/**
* An encoder for arrays of bytes.
*
* @since 1.6.1
*/
def BINARY: Encoder[Array[Byte]] = ExpressionEncoder()
/**
* Creates an encoder for Java Bean of type T.
*
* T must be publicly accessible.
*
* supported types for java bean field:
* - primitive types: boolean, int, double, etc.
* - boxed types: Boolean, Integer, Double, etc.
* - String
* - java.math.BigDecimal
* - time related: java.sql.Date, java.sql.Timestamp
* - collection types: only array and java.util.List currently, map support is in progress
* - nested java bean.
*
* @since 1.6.0
*/
def bean[T](beanClass: Class[T]): Encoder[T] = ExpressionEncoder.javaBean(beanClass)
/**
* (Scala-specific) Creates an encoder that serializes objects of type T using Kryo.
* This encoder maps T into a single byte array (binary) field.
*
* T must be publicly accessible.
*
* @since 1.6.0
*/
def kryo[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = true)
/**
* Creates an encoder that serializes objects of type T using Kryo.
* This encoder maps T into a single byte array (binary) field.
*
* T must be publicly accessible.
*
* @since 1.6.0
*/
def kryo[T](clazz: Class[T]): Encoder[T] = kryo(ClassTag[T](clazz))
/**
* (Scala-specific) Creates an encoder that serializes objects of type T using generic Java
* serialization. This encoder maps T into a single byte array (binary) field.
*
* Note that this is extremely inefficient and should only be used as the last resort.
*
* T must be publicly accessible.
*
* @since 1.6.0
*/
def javaSerialization[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = false)
/**
* Creates an encoder that serializes objects of type T using generic Java serialization.
* This encoder maps T into a single byte array (binary) field.
*
* Note that this is extremely inefficient and should only be used as the last resort.
*
* T must be publicly accessible.
*
* @since 1.6.0
*/
def javaSerialization[T](clazz: Class[T]): Encoder[T] = javaSerialization(ClassTag[T](clazz))
/** Throws an exception if T is not a public class. */
private def validatePublicClass[T: ClassTag](): Unit = {
if (!Modifier.isPublic(classTag[T].runtimeClass.getModifiers)) {
throw new UnsupportedOperationException(
s"${classTag[T].runtimeClass.getName} is not a public class. " +
"Only public classes are supported.")
}
}
/** A way to construct encoders using generic serializers. */
private def genericSerializer[T: ClassTag](useKryo: Boolean): Encoder[T] = {
if (classTag[T].runtimeClass.isPrimitive) {
throw new UnsupportedOperationException("Primitive types are not supported.")
}
validatePublicClass[T]()
ExpressionEncoder[T](
schema = new StructType().add("value", BinaryType),
flat = true,
serializer = Seq(
EncodeUsingSerializer(
BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)),
deserializer =
DecodeUsingSerializer[T](
BoundReference(0, BinaryType, nullable = true), classTag[T], kryo = useKryo),
clsTag = classTag[T]
)
}
/**
* An encoder for 2-ary tuples.
*
* @since 1.6.0
*/
def tuple[T1, T2](
e1: Encoder[T1],
e2: Encoder[T2]): Encoder[(T1, T2)] = {
ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2))
}
/**
* An encoder for 3-ary tuples.
*
* @since 1.6.0
*/
def tuple[T1, T2, T3](
e1: Encoder[T1],
e2: Encoder[T2],
e3: Encoder[T3]): Encoder[(T1, T2, T3)] = {
ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3))
}
/**
* An encoder for 4-ary tuples.
*
* @since 1.6.0
*/
def tuple[T1, T2, T3, T4](
e1: Encoder[T1],
e2: Encoder[T2],
e3: Encoder[T3],
e4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = {
ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3), encoderFor(e4))
}
/**
* An encoder for 5-ary tuples.
*
* @since 1.6.0
*/
def tuple[T1, T2, T3, T4, T5](
e1: Encoder[T1],
e2: Encoder[T2],
e3: Encoder[T3],
e4: Encoder[T4],
e5: Encoder[T5]): Encoder[(T1, T2, T3, T4, T5)] = {
ExpressionEncoder.tuple(
encoderFor(e1), encoderFor(e2), encoderFor(e3), encoderFor(e4), encoderFor(e5))
}
/**
* An encoder for Scala's product type (tuples, case classes, etc).
* @since 2.0.0
*/
def product[T <: Product : TypeTag]: Encoder[T] = ExpressionEncoder()
/**
* An encoder for Scala's primitive int type.
* @since 2.0.0
*/
def scalaInt: Encoder[Int] = ExpressionEncoder()
/**
* An encoder for Scala's primitive long type.
* @since 2.0.0
*/
def scalaLong: Encoder[Long] = ExpressionEncoder()
/**
* An encoder for Scala's primitive double type.
* @since 2.0.0
*/
def scalaDouble: Encoder[Double] = ExpressionEncoder()
/**
* An encoder for Scala's primitive float type.
* @since 2.0.0
*/
def scalaFloat: Encoder[Float] = ExpressionEncoder()
/**
* An encoder for Scala's primitive byte type.
* @since 2.0.0
*/
def scalaByte: Encoder[Byte] = ExpressionEncoder()
/**
* An encoder for Scala's primitive short type.
* @since 2.0.0
*/
def scalaShort: Encoder[Short] = ExpressionEncoder()
/**
* An encoder for Scala's primitive boolean type.
* @since 2.0.0
*/
def scalaBoolean: Encoder[Boolean] = ExpressionEncoder()
}
...@@ -44,33 +44,33 @@ abstract class SQLImplicits { ...@@ -44,33 +44,33 @@ abstract class SQLImplicits {
} }
/** @since 1.6.0 */ /** @since 1.6.0 */
implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] = ExpressionEncoder() implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] = Encoders.product[T]
// Primitives // Primitives
/** @since 1.6.0 */ /** @since 1.6.0 */
implicit def newIntEncoder: Encoder[Int] = ExpressionEncoder() implicit def newIntEncoder: Encoder[Int] = Encoders.scalaInt
/** @since 1.6.0 */ /** @since 1.6.0 */
implicit def newLongEncoder: Encoder[Long] = ExpressionEncoder() implicit def newLongEncoder: Encoder[Long] = Encoders.scalaLong
/** @since 1.6.0 */ /** @since 1.6.0 */
implicit def newDoubleEncoder: Encoder[Double] = ExpressionEncoder() implicit def newDoubleEncoder: Encoder[Double] = Encoders.scalaDouble
/** @since 1.6.0 */ /** @since 1.6.0 */
implicit def newFloatEncoder: Encoder[Float] = ExpressionEncoder() implicit def newFloatEncoder: Encoder[Float] = Encoders.scalaFloat
/** @since 1.6.0 */ /** @since 1.6.0 */
implicit def newByteEncoder: Encoder[Byte] = ExpressionEncoder() implicit def newByteEncoder: Encoder[Byte] = Encoders.scalaByte
/** @since 1.6.0 */ /** @since 1.6.0 */
implicit def newShortEncoder: Encoder[Short] = ExpressionEncoder() implicit def newShortEncoder: Encoder[Short] = Encoders.scalaShort
/** @since 1.6.0 */ /** @since 1.6.0 */
implicit def newBooleanEncoder: Encoder[Boolean] = ExpressionEncoder() implicit def newBooleanEncoder: Encoder[Boolean] = Encoders.scalaBoolean
/** @since 1.6.0 */ /** @since 1.6.0 */
implicit def newStringEncoder: Encoder[String] = ExpressionEncoder() implicit def newStringEncoder: Encoder[String] = Encoders.STRING
// Seqs // Seqs
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment