Skip to content
Snippets Groups Projects
Commit f6fcb487 authored by Wenchen Fan's avatar Wenchen Fan Committed by Michael Armbrust
Browse files

[SPARK-11477] [SQL] support create Dataset from RDD

Author: Wenchen Fan <wenchen@databricks.com>

Closes #9434 from cloud-fan/rdd2ds and squashes the following commits:

0892d72 [Wenchen Fan] support create Dataset from RDD
parent 1d04dc95
No related branches found
No related tags found
No related merge requests found
...@@ -499,6 +499,15 @@ class SQLContext private[sql]( ...@@ -499,6 +499,15 @@ class SQLContext private[sql](
new Dataset[T](this, plan) new Dataset[T](this, plan)
} }
def createDataset[T : Encoder](data: RDD[T]): Dataset[T] = {
val enc = encoderFor[T]
val attributes = enc.schema.toAttributes
val encoded = data.map(d => enc.toRow(d))
val plan = LogicalRDD(attributes, encoded)(self)
new Dataset[T](this, plan)
}
/** /**
* Creates a DataFrame from an RDD[Row]. User can specify whether the input rows should be * Creates a DataFrame from an RDD[Row]. User can specify whether the input rows should be
* converted to Catalyst rows. * converted to Catalyst rows.
......
...@@ -48,6 +48,10 @@ abstract class SQLImplicits { ...@@ -48,6 +48,10 @@ abstract class SQLImplicits {
implicit def newBooleanEncoder: Encoder[Boolean] = ExpressionEncoder[Boolean](flat = true) implicit def newBooleanEncoder: Encoder[Boolean] = ExpressionEncoder[Boolean](flat = true)
implicit def newStringEncoder: Encoder[String] = ExpressionEncoder[String](flat = true) implicit def newStringEncoder: Encoder[String] = ExpressionEncoder[String](flat = true)
implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(rdd))
}
implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = { implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(s)) DatasetHolder(_sqlContext.createDataset(s))
} }
......
...@@ -34,6 +34,13 @@ class DatasetSuite extends QueryTest with SharedSQLContext { ...@@ -34,6 +34,13 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
data: _*) data: _*)
} }
test("toDS with RDD") {
val ds = sparkContext.makeRDD(Seq("a", "b", "c"), 3).toDS()
checkAnswer(
ds.mapPartitions(_ => Iterator(1)),
1, 1, 1)
}
test("as tuple") { test("as tuple") {
val data = Seq(("a", 1), ("b", 2)).toDF("a", "b") val data = Seq(("a", 1), ("b", 2)).toDF("a", "b")
checkAnswer( checkAnswer(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment