Skip to content
Snippets Groups Projects
Commit 4659468f authored by Daoyuan Wang's avatar Daoyuan Wang Committed by Cheng Lian
Browse files

[SPARK-4985] [SQL] parquet support for date type

This PR might have some issues with #3732 ,
and this would have merge conflicts with #3820 so the review can be delayed till that 2 were merged.

Author: Daoyuan Wang <daoyuan.wang@intel.com>

Closes #3822 from adrian-wang/parquetdate and squashes the following commits:

2c5d54d [Daoyuan Wang] add a test case
faef887 [Daoyuan Wang] parquet support for primitive date
97e9080 [Daoyuan Wang] parquet support for date type
parent 2bf40c58
No related branches found
No related tags found
No related merge requests found
...@@ -127,6 +127,12 @@ private[sql] object CatalystConverter { ...@@ -127,6 +127,12 @@ private[sql] object CatalystConverter {
parent.updateByte(fieldIndex, value.asInstanceOf[ByteType.JvmType]) parent.updateByte(fieldIndex, value.asInstanceOf[ByteType.JvmType])
} }
} }
case DateType => {
new CatalystPrimitiveConverter(parent, fieldIndex) {
override def addInt(value: Int): Unit =
parent.updateDate(fieldIndex, value.asInstanceOf[DateType.JvmType])
}
}
case d: DecimalType => { case d: DecimalType => {
new CatalystPrimitiveConverter(parent, fieldIndex) { new CatalystPrimitiveConverter(parent, fieldIndex) {
override def addBinary(value: Binary): Unit = override def addBinary(value: Binary): Unit =
...@@ -192,6 +198,9 @@ private[parquet] abstract class CatalystConverter extends GroupConverter { ...@@ -192,6 +198,9 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit = protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit =
updateField(fieldIndex, value) updateField(fieldIndex, value)
protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit =
updateField(fieldIndex, value)
protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit = protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
updateField(fieldIndex, value) updateField(fieldIndex, value)
...@@ -388,6 +397,9 @@ private[parquet] class CatalystPrimitiveRowConverter( ...@@ -388,6 +397,9 @@ private[parquet] class CatalystPrimitiveRowConverter(
override protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit = override protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit =
current.setInt(fieldIndex, value) current.setInt(fieldIndex, value)
override protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit =
current.update(fieldIndex, value)
override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit = override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
current.setLong(fieldIndex, value) current.setLong(fieldIndex, value)
......
...@@ -212,6 +212,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { ...@@ -212,6 +212,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
case DoubleType => writer.addDouble(value.asInstanceOf[Double]) case DoubleType => writer.addDouble(value.asInstanceOf[Double])
case FloatType => writer.addFloat(value.asInstanceOf[Float]) case FloatType => writer.addFloat(value.asInstanceOf[Float])
case BooleanType => writer.addBoolean(value.asInstanceOf[Boolean]) case BooleanType => writer.addBoolean(value.asInstanceOf[Boolean])
case DateType => writer.addInteger(value.asInstanceOf[Int])
case d: DecimalType => case d: DecimalType =>
if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) { if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {
sys.error(s"Unsupported datatype $d, cannot write to consumer") sys.error(s"Unsupported datatype $d, cannot write to consumer")
...@@ -358,6 +359,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport { ...@@ -358,6 +359,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
case DoubleType => writer.addDouble(record.getDouble(index)) case DoubleType => writer.addDouble(record.getDouble(index))
case FloatType => writer.addFloat(record.getFloat(index)) case FloatType => writer.addFloat(record.getFloat(index))
case BooleanType => writer.addBoolean(record.getBoolean(index)) case BooleanType => writer.addBoolean(record.getBoolean(index))
case DateType => writer.addInteger(record.getInt(index))
case TimestampType => writeTimestamp(record(index).asInstanceOf[java.sql.Timestamp]) case TimestampType => writeTimestamp(record(index).asInstanceOf[java.sql.Timestamp])
case d: DecimalType => case d: DecimalType =>
if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) { if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {
......
...@@ -64,6 +64,8 @@ private[parquet] object ParquetTypesConverter extends Logging { ...@@ -64,6 +64,8 @@ private[parquet] object ParquetTypesConverter extends Logging {
case ParquetPrimitiveTypeName.BOOLEAN => BooleanType case ParquetPrimitiveTypeName.BOOLEAN => BooleanType
case ParquetPrimitiveTypeName.DOUBLE => DoubleType case ParquetPrimitiveTypeName.DOUBLE => DoubleType
case ParquetPrimitiveTypeName.FLOAT => FloatType case ParquetPrimitiveTypeName.FLOAT => FloatType
case ParquetPrimitiveTypeName.INT32
if originalType == ParquetOriginalType.DATE => DateType
case ParquetPrimitiveTypeName.INT32 => IntegerType case ParquetPrimitiveTypeName.INT32 => IntegerType
case ParquetPrimitiveTypeName.INT64 => LongType case ParquetPrimitiveTypeName.INT64 => LongType
case ParquetPrimitiveTypeName.INT96 if int96AsTimestamp => TimestampType case ParquetPrimitiveTypeName.INT96 if int96AsTimestamp => TimestampType
...@@ -222,6 +224,8 @@ private[parquet] object ParquetTypesConverter extends Logging { ...@@ -222,6 +224,8 @@ private[parquet] object ParquetTypesConverter extends Logging {
// There is no type for Byte or Short so we promote them to INT32. // There is no type for Byte or Short so we promote them to INT32.
case ShortType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32)) case ShortType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32))
case ByteType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32)) case ByteType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32))
case DateType => Some(ParquetTypeInfo(
ParquetPrimitiveTypeName.INT32, Some(ParquetOriginalType.DATE)))
case LongType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT64)) case LongType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT64))
case TimestampType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT96)) case TimestampType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT96))
case DecimalType.Fixed(precision, scale) if precision <= 18 => case DecimalType.Fixed(precision, scale) if precision <= 18 =>
......
...@@ -135,6 +135,21 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { ...@@ -135,6 +135,21 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
} }
} }
test("date type") {
def makeDateRDD(): DataFrame =
sparkContext
.parallelize(0 to 1000)
.map(i => Tuple1(DateUtils.toJavaDate(i)))
.toDF()
.select($"_1")
withTempPath { dir =>
val data = makeDateRDD()
data.saveAsParquetFile(dir.getCanonicalPath)
checkAnswer(parquetFile(dir.getCanonicalPath), data.collect().toSeq)
}
}
test("map") { test("map") {
val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i"))) val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i")))
checkParquetFile(data) checkParquetFile(data)
......
...@@ -57,7 +57,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { ...@@ -57,7 +57,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
|} |}
""".stripMargin) """.stripMargin)
testSchema[(Byte, Short, Int, Long)]( testSchema[(Byte, Short, Int, Long, java.sql.Date)](
"logical integral types", "logical integral types",
""" """
|message root { |message root {
...@@ -65,6 +65,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { ...@@ -65,6 +65,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
| required int32 _2 (INT_16); | required int32 _2 (INT_16);
| required int32 _3 (INT_32); | required int32 _3 (INT_32);
| required int64 _4 (INT_64); | required int64 _4 (INT_64);
| optional int32 _5 (DATE);
|} |}
""".stripMargin) """.stripMargin)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment