diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 878b2b0556de75593cf10d986b8db45867995359..1011bf0bb5ef4acc56dad6a3a9a6c3dc7f14735c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -549,6 +549,15 @@ trait Column extends DataFrame {
    */
   override def as(alias: String): Column = exprToColumn(Alias(expr, alias)())
 
+  /**
+   * Gives the column an alias.
+   * {{{
+   *   // Renames colA to colB in select output.
+   *   df.select($"colA".as('colB))
+   * }}}
+   */
+  override def as(alias: Symbol): Column = exprToColumn(Alias(expr, alias.name)())
+
   /**
    * Casts the column to a different data type.
    * {{{
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 17ea3cde8e50e89eafe527d4c2784d77b754c9e1..6abfb7853cf1ca3c871613564070d78ad9e8f595 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -156,7 +156,7 @@ trait DataFrame extends RDDApi[Row] {
   def join(right: DataFrame, joinExprs: Column): DataFrame
 
   /**
-   * Join with another [[DataFrame]], usin  g the given join expression. The following performs
+   * Join with another [[DataFrame]], using the given join expression. The following performs
    * a full outer join between `df1` and `df2`.
    *
    * {{{
@@ -233,7 +233,12 @@ trait DataFrame extends RDDApi[Row] {
   /**
    * Returns a new [[DataFrame]] with an alias set.
    */
-  def as(name: String): DataFrame
+  def as(alias: String): DataFrame
+
+  /**
+   * (Scala-specific) Returns a new [[DataFrame]] with an alias set.
+   */
+  def as(alias: Symbol): DataFrame
 
   /**
    * Selects a set of expressions.
@@ -516,6 +521,9 @@ trait DataFrame extends RDDApi[Row] {
    */
   override def repartition(numPartitions: Int): DataFrame
 
+  /** Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]]. */
+  override def distinct: DataFrame
+
   override def persist(): this.type
 
   override def persist(newLevel: StorageLevel): this.type
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index fa05a5dcac6bf7bb24a0e0a2b287c0d9f2a3a3f5..73393295ab0a57d5724ae484872dbaa60ffdb8ec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -196,7 +196,9 @@ private[sql] class DataFrameImpl protected[sql](
     }.toSeq :_*)
   }
 
-  override def as(name: String): DataFrame = Subquery(name, logicalPlan)
+  override def as(alias: String): DataFrame = Subquery(alias, logicalPlan)
+
+  override def as(alias: Symbol): DataFrame = Subquery(alias.name, logicalPlan)
 
   override def select(cols: Column*): DataFrame = {
     val exprs = cols.zipWithIndex.map {
@@ -215,7 +217,19 @@ private[sql] class DataFrameImpl protected[sql](
   override def selectExpr(exprs: String*): DataFrame = {
     select(exprs.map { expr =>
       Column(new SqlParser().parseExpression(expr))
-    } :_*)
+    }: _*)
+  }
+
+  override def addColumn(colName: String, col: Column): DataFrame = {
+    select(Column("*"), col.as(colName))
+  }
+
+  override def renameColumn(existingName: String, newName: String): DataFrame = {
+    val colNames = schema.map { field =>
+      val name = field.name
+      if (name == existingName) Column(name).as(newName) else Column(name)
+    }
+    select(colNames :_*)
   }
 
   override def filter(condition: Column): DataFrame = {
@@ -264,18 +278,8 @@ private[sql] class DataFrameImpl protected[sql](
   }
 
   /////////////////////////////////////////////////////////////////////////////
-
-  override def addColumn(colName: String, col: Column): DataFrame = {
-    select(Column("*"), col.as(colName))
-  }
-
-  override def renameColumn(existingName: String, newName: String): DataFrame = {
-    val colNames = schema.map { field =>
-      val name = field.name
-      if (name == existingName) Column(name).as(newName) else Column(name)
-    }
-    select(colNames :_*)
-  }
+  // RDD API
+  /////////////////////////////////////////////////////////////////////////////
 
   override def head(n: Int): Array[Row] = limit(n).collect()
 
@@ -307,6 +311,8 @@ private[sql] class DataFrameImpl protected[sql](
     sqlContext.applySchema(rdd.repartition(numPartitions), schema)
   }
 
+  override def distinct: DataFrame = Distinct(logicalPlan)
+
   override def persist(): this.type = {
     sqlContext.cacheManager.cacheQuery(this)
     this
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
index 782f6e28eebb06b0d179b228c6ea15734982e557..0600dcc226b4d853ffc2bd27ece61a777bdbdea7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
@@ -86,6 +86,10 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
 
   override def selectExpr(exprs: String*): DataFrame = err()
 
+  override def addColumn(colName: String, col: Column): DataFrame = err()
+
+  override def renameColumn(existingName: String, newName: String): DataFrame = err()
+
   override def filter(condition: Column): DataFrame = err()
 
   override def filter(conditionExpr: String): DataFrame = err()
@@ -110,10 +114,6 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
 
   /////////////////////////////////////////////////////////////////////////////
 
-  override def addColumn(colName: String, col: Column): DataFrame = err()
-
-  override def renameColumn(existingName: String, newName: String): DataFrame = err()
-
   override def head(n: Int): Array[Row] = err()
 
   override def head(): Row = err()
@@ -140,6 +140,8 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
 
   override def repartition(numPartitions: Int): DataFrame = err()
 
+  override def distinct: DataFrame = err()
+
   override def persist(): this.type = err()
 
   override def persist(newLevel: StorageLevel): this.type = err()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
index 38e6382f171d51df9cb2aa7ed54db5596012c397..df866fd1ad8adb63e9abcee51083158ff8149441 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
@@ -60,4 +60,6 @@ private[sql] trait RDDApi[T] {
   def first(): T
 
   def repartition(numPartitions: Int): DataFrame
+
+  def distinct: DataFrame
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index bf3990671029ef7cd3c526e69a2d52bee8e3b128..97e3777f933e4a9cdcb4ddbc94a553546994b05e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, NoRelation}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
@@ -130,6 +130,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
    */
   val experimental: ExperimentalMethods = new ExperimentalMethods(this)
 
+  /** Returns a [[DataFrame]] with no rows or columns. */
+  lazy val emptyDataFrame = DataFrame(this, NoRelation)
+
   /**
    * A collection of methods for registering user-defined functions (UDF).
    *