From f6fcb4874ce20a1daa91b7434cf9c0254a89e979 Mon Sep 17 00:00:00 2001
From: Wenchen Fan <wenchen@databricks.com>
Date: Wed, 4 Nov 2015 00:15:50 +0100
Subject: [PATCH] [SPARK-11477] [SQL] support create Dataset from RDD

Author: Wenchen Fan <wenchen@databricks.com>

Closes #9434 from cloud-fan/rdd2ds and squashes the following commits:

0892d72 [Wenchen Fan] support create Dataset from RDD
---
 .../src/main/scala/org/apache/spark/sql/SQLContext.scala | 9 +++++++++
 .../main/scala/org/apache/spark/sql/SQLImplicits.scala   | 4 ++++
 .../test/scala/org/apache/spark/sql/DatasetSuite.scala   | 7 +++++++
 3 files changed, 20 insertions(+)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 2cb94430e6..5ad3871093 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -499,6 +499,15 @@ class SQLContext private[sql](
     new Dataset[T](this, plan)
   }
 
+  def createDataset[T : Encoder](data: RDD[T]): Dataset[T] = {
+    val enc = encoderFor[T]
+    val attributes = enc.schema.toAttributes
+    val encoded = data.map(d => enc.toRow(d))
+    val plan = LogicalRDD(attributes, encoded)(self)
+
+    new Dataset[T](this, plan)
+  }
+
   /**
    * Creates a DataFrame from an RDD[Row]. User can specify whether the input rows should be
    * converted to Catalyst rows.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
index f460a86414..f2904e2708 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
@@ -48,6 +48,10 @@ abstract class SQLImplicits {
   implicit def newBooleanEncoder: Encoder[Boolean] = ExpressionEncoder[Boolean](flat = true)
   implicit def newStringEncoder: Encoder[String] = ExpressionEncoder[String](flat = true)
 
+  implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
+    DatasetHolder(_sqlContext.createDataset(rdd))
+  }
+
   implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
     DatasetHolder(_sqlContext.createDataset(s))
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 5973fa7f2a..3e9b621cfd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -34,6 +34,13 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
       data: _*)
   }
 
+  test("toDS with RDD") {
+    val ds = sparkContext.makeRDD(Seq("a", "b", "c"), 3).toDS()
+    checkAnswer(
+      ds.mapPartitions(_ => Iterator(1)),
+      1, 1, 1)
+  }
+
   test("as tuple") {
     val data = Seq(("a", 1), ("b", 2)).toDF("a", "b")
     checkAnswer(
-- 
GitLab