diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
index 14a855054b94d52db3d03a090ae217bad9972e21..f3830c6d3bcf2235183e998a3a74442119e35cbf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
@@ -326,7 +326,7 @@ case class Average(child: Expression) extends PartialAggregate with trees.UnaryN
 
   override def asPartial: SplitEvaluation = {
     child.dataType match {
-      case DecimalType.Fixed(_, _) =>
+      case DecimalType.Fixed(_, _) | DecimalType.Unlimited =>
         // Turn the child to unlimited decimals for calculation, before going back to fixed
         val partialSum = Alias(Sum(Cast(child, DecimalType.Unlimited)), "PartialSum")()
         val partialCount = Alias(Count(child), "PartialCount")()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 34b2cb054a3e73dca3314fb467b80cdb33f70aa4..44a7d1e7bbb6a69335860a454f8dbf8d90feff9f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -537,4 +537,13 @@ class DataFrameSuite extends QueryTest {
     val df = TestSQLContext.createDataFrame(rowRDD, schema)
     df.rdd.collect()
   }
+
+  test("SPARK-6899") {
+    val originalValue = TestSQLContext.conf.codegenEnabled
+    TestSQLContext.setConf(SQLConf.CODEGEN_ENABLED, "true")
+    checkAnswer(
+      decimalData.agg(avg('a)),
+      Row(new java.math.BigDecimal(2.0)))
+    TestSQLContext.setConf(SQLConf.CODEGEN_ENABLED, originalValue.toString)
+  }
 }