diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index 1b4196ab0be35bf8704373abc76f6a884378e86d..caa9f045537d06f902333f9c25cf432a516b8680 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -243,8 +243,10 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
   /**
    * Read a decimal value from a Parquet Binary into "dest". Only supports decimals that fit in
    * a long (i.e. precision <= 18)
+   *
+   * Returned value is needed by CatalystConverter, which doesn't reuse the Decimal object.
    */
-  protected[parquet] def readDecimal(dest: Decimal, value: Binary, ctype: DecimalType): Unit = {
+  protected[parquet] def readDecimal(dest: Decimal, value: Binary, ctype: DecimalType): Decimal = {
     val precision = ctype.precisionInfo.get.precision
     val scale = ctype.precisionInfo.get.scale
     val bytes = value.getBytes
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index b98ba09ccfc2d8a931817f8a16b924df1bd919c7..304936fb2be8e0898ec67b94f5e0b83b75cda0ca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.parquet
 
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.{SQLConf, QueryTest}
 import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.test.TestSQLContext
@@ -111,6 +112,18 @@ class ParquetQuerySuiteBase extends QueryTest with ParquetTest {
         List(Row("same", "run_5", 100)))
     }
   }
+
+  test("SPARK-6917 DecimalType should work with non-native types") {
+    val data = (1 to 10).map(i => Row(Decimal(i, 18, 0), new java.sql.Timestamp(i)))
+    val schema = StructType(List(StructField("d", DecimalType(18, 0), false),
+      StructField("time", TimestampType, false)).toArray)
+    withTempPath { file =>
+      val df = sqlContext.createDataFrame(sparkContext.parallelize(data), schema)
+      df.write.parquet(file.getCanonicalPath)
+      val df2 = sqlContext.read.parquet(file.getCanonicalPath)
+      checkAnswer(df2, df.collect().toSeq)
+    }
+  }
 }
 
 class ParquetDataSourceOnQuerySuite extends ParquetQuerySuiteBase with BeforeAndAfterAll {