diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index ffc2baf7a8826d7bf8d6f960908caf789b6c1bdc..6f8ffb54402a7547175c13b27f054adcd4068b1b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -291,9 +291,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
   @transient
   val udf: UDFRegistration = new UDFRegistration(this)
 
-  @transient
-  val udaf: UDAFRegistration = new UDAFRegistration(this)
-
   /**
    * Returns true if the table is currently cached in-memory.
    * @group cachemgmt
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDAFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDAFRegistration.scala
deleted file mode 100644
index 0d4e30f29255ee0483182fdf39a5ce104e984c88..0000000000000000000000000000000000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDAFRegistration.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.expressions.{Expression}
-import org.apache.spark.sql.execution.aggregate.ScalaUDAF
-import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
-
-class UDAFRegistration private[sql] (sqlContext: SQLContext) extends Logging {
-
-  private val functionRegistry = sqlContext.functionRegistry
-
-  def register(
-      name: String,
-      func: UserDefinedAggregateFunction): UserDefinedAggregateFunction = {
-    def builder(children: Seq[Expression]) = ScalaUDAF(children, func)
-    functionRegistry.registerFunction(name, builder)
-    func
-  }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
index 7cd7421a518c9a5294bd6e04dc70de01bc3fd81f..1f270560d7bc161b32f563d1a3a7352733ecd645 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
@@ -26,6 +26,8 @@ import org.apache.spark.Logging
 import org.apache.spark.sql.api.java._
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUDF}
+import org.apache.spark.sql.execution.aggregate.ScalaUDAF
+import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
 import org.apache.spark.sql.types.DataType
 
 /**
@@ -52,6 +54,20 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging {
     functionRegistry.registerFunction(name, udf.builder)
   }
 
+  /**
+   * Register a user-defined aggregate function (UDAF).
+   * @param name the name of the UDAF.
+   * @param udaf the UDAF needs to be registered.
+   * @return the registered UDAF.
+   */
+  def register(
+      name: String,
+      udaf: UserDefinedAggregateFunction): UserDefinedAggregateFunction = {
+    def builder(children: Seq[Expression]) = ScalaUDAF(children, udaf)
+    functionRegistry.registerFunction(name, builder)
+    udaf
+  }
+
   // scalastyle:off
 
   /* register 0-22 were generated by this script
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
index 5fafc916bfa0b9a3369aaefe81178ec1931d14b5..7619f3ec9f0a75f93f4064010b220ae85453aa09 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
@@ -316,7 +316,7 @@ private[sql] case class ScalaUDAF(
 
   override lazy val cloneBufferAttributes = bufferAttributes.map(_.newInstance())
 
-  private[this] val childrenSchema: StructType = {
+  private[this] lazy val childrenSchema: StructType = {
     val inputFields = children.zipWithIndex.map {
       case (child, index) =>
         StructField(s"input$index", child.dataType, child.nullable, Metadata.empty)
@@ -337,16 +337,16 @@ private[sql] case class ScalaUDAF(
     }
   }
 
-  private[this] val inputToScalaConverters: Any => Any =
+  private[this] lazy val inputToScalaConverters: Any => Any =
     CatalystTypeConverters.createToScalaConverter(childrenSchema)
 
-  private[this] val bufferValuesToCatalystConverters: Array[Any => Any] = {
+  private[this] lazy val bufferValuesToCatalystConverters: Array[Any => Any] = {
     bufferSchema.fields.map { field =>
       CatalystTypeConverters.createToCatalystConverter(field.dataType)
     }
   }
 
-  private[this] val bufferValuesToScalaConverters: Array[Any => Any] = {
+  private[this] lazy val bufferValuesToScalaConverters: Array[Any => Any] = {
     bufferSchema.fields.map { field =>
       CatalystTypeConverters.createToScalaConverter(field.dataType)
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/udaf.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/udaf.scala
index 278dd438fab4a2ee23729bad1c9f0b9a980ed498..5180871585f25d2c18de07777896194dde498771 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/udaf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/udaf.scala
@@ -17,7 +17,10 @@
 
 package org.apache.spark.sql.expressions
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.ScalaUDF
+import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, AggregateExpression2}
+import org.apache.spark.sql.execution.aggregate.ScalaUDAF
+import org.apache.spark.sql.{Column, Row}
 import org.apache.spark.sql.types._
 import org.apache.spark.annotation.Experimental
 
@@ -87,6 +90,33 @@ abstract class UserDefinedAggregateFunction extends Serializable {
    * aggregation buffer.
    */
   def evaluate(buffer: Row): Any
+
+  /**
+   * Creates a [[Column]] for this UDAF with given [[Column]]s as arguments.
+   */
+  @scala.annotation.varargs
+  def apply(exprs: Column*): Column = {
+    val aggregateExpression =
+      AggregateExpression2(
+        ScalaUDAF(exprs.map(_.expr), this),
+        Complete,
+        isDistinct = false)
+    Column(aggregateExpression)
+  }
+
+  /**
+   * Creates a [[Column]] for this UDAF with given [[Column]]s as arguments.
+   * If `isDistinct` is true, this UDAF is working on distinct input values.
+   */
+  @scala.annotation.varargs
+  def apply(isDistinct: Boolean, exprs: Column*): Column = {
+    val aggregateExpression =
+      AggregateExpression2(
+        ScalaUDAF(exprs.map(_.expr), this),
+        Complete,
+        isDistinct = isDistinct)
+    Column(aggregateExpression)
+  }
 }
 
 /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 5a10c3891ad6c5c1e285caed7a52f3f483206e9b..39aa905c8532a9f1332bd00d244358c5cdc2aff7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2500,6 +2500,7 @@ object functions {
    * @group udf_funcs
    * @since 1.5.0
    */
+  @scala.annotation.varargs
   def callUDF(udfName: String, cols: Column*): Column = {
     UnresolvedFunction(udfName, cols.map(_.expr), isDistinct = false)
   }
diff --git a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java b/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java
index 613b2bcc80e37427846696a1b7e51053fcc301d7..21b053f07a3ba67b4b269f85fa82da50a85f53e5 100644
--- a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java
+++ b/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java
@@ -29,8 +29,12 @@ import org.junit.Test;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.*;
 import org.apache.spark.sql.expressions.Window;
+import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
+import static org.apache.spark.sql.functions.*;
 import org.apache.spark.sql.hive.HiveContext;
 import org.apache.spark.sql.hive.test.TestHive$;
+import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
+import test.org.apache.spark.sql.hive.aggregate.MyDoubleSum;
 
 public class JavaDataFrameSuite {
   private transient JavaSparkContext sc;
@@ -77,4 +81,26 @@ public class JavaDataFrameSuite {
         "      ROWS BETWEEN 1 preceding and 1 following) " +
         "FROM window_table").collectAsList());
   }
+
+  @Test
+  public void testUDAF() {
+    DataFrame df = hc.range(0, 100).unionAll(hc.range(0, 100)).select(col("id").as("value"));
+    UserDefinedAggregateFunction udaf = new MyDoubleSum();
+    UserDefinedAggregateFunction registeredUDAF = hc.udf().register("mydoublesum", udaf);
+    // Create Columns for the UDAF. For now, callUDF does not take an argument to specific if
+    // we want to use distinct aggregation.
+    DataFrame aggregatedDF =
+      df.groupBy()
+        .agg(
+          udaf.apply(true, col("value")),
+          udaf.apply(col("value")),
+          registeredUDAF.apply(col("value")),
+          callUDF("mydoublesum", col("value")));
+
+    List<Row> expectedResult = new ArrayList<Row>();
+    expectedResult.add(RowFactory.create(4950.0, 9900.0, 9900.0, 9900.0));
+    checkAnswer(
+      aggregatedDF,
+      expectedResult);
+  }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
index 6f0db27775e4dd5dd82f77032ebe36632ed2c1e0..4b35c8fd83533c45c210442d69f9f0d7993fed37 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
@@ -73,8 +73,8 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Be
     emptyDF.registerTempTable("emptyTable")
 
     // Register UDAFs
-    sqlContext.udaf.register("mydoublesum", new MyDoubleSum)
-    sqlContext.udaf.register("mydoubleavg", new MyDoubleAvg)
+    sqlContext.udf.register("mydoublesum", new MyDoubleSum)
+    sqlContext.udf.register("mydoubleavg", new MyDoubleAvg)
   }
 
   override def afterAll(): Unit = {