Skip to content
Snippets Groups Projects
Commit 42d225f4 authored by Wenchen Fan's avatar Wenchen Fan Committed by Michael Armbrust
Browse files

[SPARK-11216][SQL][FOLLOW-UP] add encoder/decoder for external row

address comments in https://github.com/apache/spark/pull/9184

Author: Wenchen Fan <wenchen@databricks.com>

Closes #9212 from cloud-fan/encoder.
parent f6d06adf
No related branches found
No related tags found
No related merge requests found
......@@ -48,20 +48,12 @@ case class ClassEncoder[T](
private val dataType = ObjectType(clsTag.runtimeClass)
override def toRow(t: T): InternalRow = {
if (t == null) {
null
} else {
inputRow(0) = t
extractProjection(inputRow)
}
inputRow(0) = t
extractProjection(inputRow)
}
override def fromRow(row: InternalRow): T = {
if (row eq null) {
null.asInstanceOf[T]
} else {
constructProjection(row).get(0, dataType).asInstanceOf[T]
}
constructProjection(row).get(0, dataType).asInstanceOf[T]
}
override def bind(schema: Seq[Attribute]): ClassEncoder[T] = {
......
......@@ -26,8 +26,11 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
/**
* A factory for constructing encoders that convert external row to/from the Spark SQL
* internal binary representation.
*/
object RowEncoder {
def apply(schema: StructType): ClassEncoder[Row] = {
val cls = classOf[Row]
val inputObject = BoundReference(0, ObjectType(cls), nullable = true)
......@@ -136,7 +139,7 @@ object RowEncoder {
constructorFor(BoundReference(i, f.dataType, f.nullable), f.dataType)
)
}
CreateRow(fields)
CreateExternalRow(fields)
}
private def constructorFor(input: Expression, dataType: DataType): Expression = dataType match {
......@@ -195,7 +198,7 @@ object RowEncoder {
Literal.create(null, externalDataTypeFor(f.dataType)),
constructorFor(getField(input, i, f.dataType), f.dataType))
}
CreateRow(convertedFields)
CreateExternalRow(convertedFields)
}
private def getField(
......
......@@ -456,7 +456,13 @@ case class MapObjects(
}
}
case class CreateRow(children: Seq[Expression]) extends Expression {
/**
* Constructs a new external row, using the result of evaluating the specified expressions
* as content.
*
* @param children A list of expression to use as content of the external row.
*/
case class CreateExternalRow(children: Seq[Expression]) extends Expression {
override def dataType: DataType = ObjectType(classOf[Row])
override def nullable: Boolean = false
......
......@@ -73,7 +73,7 @@ class RowEncoderSuite extends SparkFunSuite {
private def encodeDecodeTest(schema: StructType): Unit = {
test(s"encode/decode: ${schema.simpleString}") {
val encoder = RowEncoder(schema)
val inputGenerator = RandomDataGenerator.forType(schema).get
val inputGenerator = RandomDataGenerator.forType(schema, nullable = false).get
var input: Row = null
try {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment