Skip to content
Snippets Groups Projects
Commit d380f324 authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-5853][SQL] Schema support in Row.

Author: Reynold Xin <rxin@databricks.com>

Closes #4640 from rxin/SPARK-5853 and squashes the following commits:

9c6f569 [Reynold Xin] [SPARK-5853][SQL] Schema support in Row.
parent a51d51ff
No related branches found
No related tags found
No related merge requests found
...@@ -20,7 +20,7 @@ package org.apache.spark.sql ...@@ -20,7 +20,7 @@ package org.apache.spark.sql
import scala.util.hashing.MurmurHash3 import scala.util.hashing.MurmurHash3
import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.types.DateUtils import org.apache.spark.sql.types.{StructType, DateUtils}
object Row { object Row {
/** /**
...@@ -122,6 +122,11 @@ trait Row extends Serializable { ...@@ -122,6 +122,11 @@ trait Row extends Serializable {
/** Number of elements in the Row. */ /** Number of elements in the Row. */
def length: Int def length: Int
/**
* Schema for the row.
*/
def schema: StructType = null
/** /**
* Returns the value at position i. If the value is null, null is returned. The following * Returns the value at position i. If the value is null, null is returned. The following
* is a mapping between Spark SQL types and return types: * is a mapping between Spark SQL types and return types:
......
...@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst ...@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst
import java.sql.Timestamp import java.sql.Timestamp
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute, AttributeReference, Row} import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
...@@ -91,9 +91,9 @@ trait ScalaReflection { ...@@ -91,9 +91,9 @@ trait ScalaReflection {
def convertRowToScala(r: Row, schema: StructType): Row = { def convertRowToScala(r: Row, schema: StructType): Row = {
// TODO: This is very slow!!! // TODO: This is very slow!!!
new GenericRow( new GenericRowWithSchema(
r.toSeq.zip(schema.fields.map(_.dataType)) r.toSeq.zip(schema.fields.map(_.dataType))
.map(r_dt => convertToScala(r_dt._1, r_dt._2)).toArray) .map(r_dt => convertToScala(r_dt._1, r_dt._2)).toArray, schema)
} }
/** Returns a Sequence of attributes for the given case class type. */ /** Returns a Sequence of attributes for the given case class type. */
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.expressions package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.types.NativeType import org.apache.spark.sql.types.{StructType, NativeType}
/** /**
...@@ -149,6 +149,10 @@ class GenericRow(protected[sql] val values: Array[Any]) extends Row { ...@@ -149,6 +149,10 @@ class GenericRow(protected[sql] val values: Array[Any]) extends Row {
def copy() = this def copy() = this
} }
class GenericRowWithSchema(values: Array[Any], override val schema: StructType)
extends GenericRow(values) {
}
class GenericMutableRow(v: Array[Any]) extends GenericRow(v) with MutableRow { class GenericMutableRow(v: Array[Any]) extends GenericRow(v) with MutableRow {
/** No-arg constructor for serialization. */ /** No-arg constructor for serialization. */
def this() = this(null) def this() = this(null)
......
...@@ -89,6 +89,12 @@ class DataFrameSuite extends QueryTest { ...@@ -89,6 +89,12 @@ class DataFrameSuite extends QueryTest {
testData.collect().toSeq) testData.collect().toSeq)
} }
test("head and take") {
assert(testData.take(2) === testData.collect().take(2))
assert(testData.head(2) === testData.collect().take(2))
assert(testData.head(2).head.schema === testData.schema)
}
test("self join") { test("self join") {
val df1 = testData.select(testData("key")).as('df1) val df1 = testData.select(testData("key")).as('df1)
val df2 = testData.select(testData("key")).as('df2) val df2 = testData.select(testData("key")).as('df2)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment