Skip to content
Snippets Groups Projects
Commit bcb47ad7 authored by Davies Liu's avatar Davies Liu Committed by Reynold Xin
Browse files

[SPARK-6917] [SQL] DecimalType is not read back when non-native type exists

cc yhuai

Author: Davies Liu <davies@databricks.com>

Closes #6558 from davies/decimalType and squashes the following commits:

c877ca8 [Davies Liu] Update ParquetConverter.scala
48cc57c [Davies Liu] Update ParquetConverter.scala
b43845c [Davies Liu] add test
3b4a94f [Davies Liu] DecimalType is not read back when non-native type exists
parent 0221c7f0
No related branches found
No related tags found
No related merge requests found
......@@ -243,8 +243,10 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
/**
* Read a decimal value from a Parquet Binary into "dest". Only supports decimals that fit in
* a long (i.e. precision <= 18)
*
* Returned value is needed by CatalystConverter, which doesn't reuse the Decimal object.
*/
protected[parquet] def readDecimal(dest: Decimal, value: Binary, ctype: DecimalType): Unit = {
protected[parquet] def readDecimal(dest: Decimal, value: Binary, ctype: DecimalType): Decimal = {
val precision = ctype.precisionInfo.get.precision
val scale = ctype.precisionInfo.get.scale
val bytes = value.getBytes
......
......@@ -19,6 +19,7 @@ package org.apache.spark.sql.parquet
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.types._
import org.apache.spark.sql.{SQLConf, QueryTest}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.test.TestSQLContext
......@@ -111,6 +112,18 @@ class ParquetQuerySuiteBase extends QueryTest with ParquetTest {
List(Row("same", "run_5", 100)))
}
}
test("SPARK-6917 DecimalType should work with non-native types") {
val data = (1 to 10).map(i => Row(Decimal(i, 18, 0), new java.sql.Timestamp(i)))
val schema = StructType(List(StructField("d", DecimalType(18, 0), false),
StructField("time", TimestampType, false)).toArray)
withTempPath { file =>
val df = sqlContext.createDataFrame(sparkContext.parallelize(data), schema)
df.write.parquet(file.getCanonicalPath)
val df2 = sqlContext.read.parquet(file.getCanonicalPath)
checkAnswer(df2, df.collect().toSeq)
}
}
}
class ParquetDataSourceOnQuerySuite extends ParquetQuerySuiteBase with BeforeAndAfterAll {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment