From 0c20ce69fb4bcb1cec5313a9d072826c5588cbbc Mon Sep 17 00:00:00 2001 From: Daoyuan Wang <daoyuan.wang@intel.com> Date: Tue, 3 Feb 2015 12:06:06 -0800 Subject: [PATCH] [SPARK-4987] [SQL] parquet timestamp type support Author: Daoyuan Wang <daoyuan.wang@intel.com> Closes #3820 from adrian-wang/parquettimestamp and squashes the following commits: b1e2a0d [Daoyuan Wang] fix for nanos 4dadef1 [Daoyuan Wang] fix wrong read 93f438d [Daoyuan Wang] parquet timestamp support --- docs/sql-programming-guide.md | 9 ++ pom.xml | 1 + sql/core/pom.xml | 5 + .../scala/org/apache/spark/sql/SQLConf.scala | 7 ++ .../spark/sql/parquet/ParquetConverter.scala | 94 ++++++++++++++++++- .../spark/sql/parquet/ParquetRelation.scala | 4 +- .../sql/parquet/ParquetTableSupport.scala | 13 ++- .../spark/sql/parquet/ParquetTestData.scala | 8 +- .../spark/sql/parquet/ParquetTypes.scala | 52 ++++++---- .../apache/spark/sql/parquet/newParquet.scala | 3 +- .../sql/parquet/timestamp/NanoTime.scala | 69 ++++++++++++++ 11 files changed, 239 insertions(+), 26 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index be8c5c2c15..22664b419f 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -580,6 +580,15 @@ Configuration of Parquet can be done using the `setConf` method on SQLContext or flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems. </td> </tr> +<tr> + <td><code>spark.sql.parquet.int96AsTimestamp</code></td> + <td>true</td> + <td> + Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. Spark would also + store Timestamp as INT96 because we need to avoid precision lost of the nanoseconds field. This + flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems. + </td> +</tr> <tr> <td><code>spark.sql.parquet.cacheMetadata</code></td> <td>true</td> diff --git a/pom.xml b/pom.xml index e25eced877..542efbaf06 100644 --- a/pom.xml +++ b/pom.xml @@ -149,6 +149,7 @@ <scala.binary.version>2.10</scala.binary.version> <jline.version>${scala.version}</jline.version> <jline.groupid>org.scala-lang</jline.groupid> + <jodd.version>3.6.3</jodd.version> <codehaus.jackson.version>1.8.8</codehaus.jackson.version> <snappy.version>1.1.1.6</snappy.version> diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 1a0c77d282..03a5c9e7c2 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -66,6 +66,11 @@ <artifactId>jackson-databind</artifactId> <version>2.3.0</version> </dependency> + <dependency> + <groupId>org.jodd</groupId> + <artifactId>jodd-core</artifactId> + <version>${jodd.version}</version> + </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 561a91d2d6..7fe17944a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -33,6 +33,7 @@ private[spark] object SQLConf { val DIALECT = "spark.sql.dialect" val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString" + val PARQUET_INT96_AS_TIMESTAMP = "spark.sql.parquet.int96AsTimestamp" val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata" val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec" val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown" @@ -143,6 +144,12 @@ private[sql] class SQLConf extends Serializable { private[spark] def isParquetBinaryAsString: Boolean = getConf(PARQUET_BINARY_AS_STRING, "false").toBoolean + /** + * When set to true, we always treat INT96Values in Parquet files as timestamp. + */ + private[spark] def isParquetINT96AsTimestamp: Boolean = + getConf(PARQUET_INT96_AS_TIMESTAMP, "true").toBoolean + /** * When set to true, partition pruning for in-memory columnar tables is enabled. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala index 10df8c3310..d87ddfeabd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala @@ -17,8 +17,12 @@ package org.apache.spark.sql.parquet +import java.sql.Timestamp +import java.util.{TimeZone, Calendar} + import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap} +import jodd.datetime.JDateTime import parquet.column.Dictionary import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter} import parquet.schema.MessageType @@ -26,6 +30,7 @@ import parquet.schema.MessageType import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.parquet.CatalystConverter.FieldType import org.apache.spark.sql.types._ +import org.apache.spark.sql.parquet.timestamp.NanoTime /** * Collection of converters of Parquet types (group and primitive types) that @@ -123,6 +128,12 @@ private[sql] object CatalystConverter { parent.updateDecimal(fieldIndex, value, d) } } + case TimestampType => { + new CatalystPrimitiveConverter(parent, fieldIndex) { + override def addBinary(value: Binary): Unit = + parent.updateTimestamp(fieldIndex, value) + } + } // All other primitive types use the default converter case ctype: PrimitiveType => { // note: need the type tag here! new CatalystPrimitiveConverter(parent, fieldIndex) @@ -197,9 +208,11 @@ private[parquet] abstract class CatalystConverter extends GroupConverter { protected[parquet] def updateString(fieldIndex: Int, value: String): Unit = updateField(fieldIndex, value) - protected[parquet] def updateDecimal(fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = { + protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit = + updateField(fieldIndex, readTimestamp(value)) + + protected[parquet] def updateDecimal(fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = updateField(fieldIndex, readDecimal(new Decimal(), value, ctype)) - } protected[parquet] def isRootConverter: Boolean = parent == null @@ -232,6 +245,13 @@ private[parquet] abstract class CatalystConverter extends GroupConverter { unscaled = (unscaled << (64 - numBits)) >> (64 - numBits) dest.set(unscaled, precision, scale) } + + /** + * Read a Timestamp value from a Parquet Int96Value + */ + protected[parquet] def readTimestamp(value: Binary): Timestamp = { + CatalystTimestampConverter.convertToTimestamp(value) + } } /** @@ -384,6 +404,9 @@ private[parquet] class CatalystPrimitiveRowConverter( override protected[parquet] def updateString(fieldIndex: Int, value: String): Unit = current.setString(fieldIndex, value) + override protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit = + current.update(fieldIndex, readTimestamp(value)) + override protected[parquet] def updateDecimal( fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = { var decimal = current(fieldIndex).asInstanceOf[Decimal] @@ -454,6 +477,73 @@ private[parquet] object CatalystArrayConverter { val INITIAL_ARRAY_SIZE = 20 } +private[parquet] object CatalystTimestampConverter { + // TODO most part of this comes from Hive-0.14 + // Hive code might have some issues, so we need to keep an eye on it. + // Also we use NanoTime and Int96Values from parquet-examples. + // We utilize jodd to convert between NanoTime and Timestamp + val parquetTsCalendar = new ThreadLocal[Calendar] + def getCalendar = { + // this is a cache for the calendar instance. + if (parquetTsCalendar.get == null) { + parquetTsCalendar.set(Calendar.getInstance(TimeZone.getTimeZone("GMT"))) + } + parquetTsCalendar.get + } + val NANOS_PER_SECOND: Long = 1000000000 + val SECONDS_PER_MINUTE: Long = 60 + val MINUTES_PER_HOUR: Long = 60 + val NANOS_PER_MILLI: Long = 1000000 + + def convertToTimestamp(value: Binary): Timestamp = { + val nt = NanoTime.fromBinary(value) + val timeOfDayNanos = nt.getTimeOfDayNanos + val julianDay = nt.getJulianDay + val jDateTime = new JDateTime(julianDay.toDouble) + val calendar = getCalendar + calendar.set(Calendar.YEAR, jDateTime.getYear) + calendar.set(Calendar.MONTH, jDateTime.getMonth - 1) + calendar.set(Calendar.DAY_OF_MONTH, jDateTime.getDay) + + // written in command style + var remainder = timeOfDayNanos + calendar.set( + Calendar.HOUR_OF_DAY, + (remainder / (NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR)).toInt) + remainder = remainder % (NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR) + calendar.set( + Calendar.MINUTE, (remainder / (NANOS_PER_SECOND * SECONDS_PER_MINUTE)).toInt) + remainder = remainder % (NANOS_PER_SECOND * SECONDS_PER_MINUTE) + calendar.set(Calendar.SECOND, (remainder / NANOS_PER_SECOND).toInt) + val nanos = remainder % NANOS_PER_SECOND + val ts = new Timestamp(calendar.getTimeInMillis) + ts.setNanos(nanos.toInt) + ts + } + + def convertFromTimestamp(ts: Timestamp): Binary = { + val calendar = getCalendar + calendar.setTime(ts) + val jDateTime = new JDateTime(calendar.get(Calendar.YEAR), + calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH)) + // Hive-0.14 didn't set hour before get day number, while the day number should + // has something to do with hour, since julian day number grows at 12h GMT + // here we just follow what hive does. + val julianDay = jDateTime.getJulianDayNumber + + val hour = calendar.get(Calendar.HOUR_OF_DAY) + val minute = calendar.get(Calendar.MINUTE) + val second = calendar.get(Calendar.SECOND) + val nanos = ts.getNanos + // Hive-0.14 would use hours directly, that might be wrong, since the day starts + // from 12h in Julian. here we just follow what hive does. + val nanosOfDay = nanos + second * NANOS_PER_SECOND + + minute * NANOS_PER_SECOND * SECONDS_PER_MINUTE + + hour * NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR + NanoTime(julianDay, nanosOfDay).toBinary + } +} + /** * A `parquet.io.api.GroupConverter` that converts a single-element groups that * match the characteristics of an array (see diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index a54485e719..b0db9943a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -65,8 +65,8 @@ private[sql] case class ParquetRelation( ParquetTypesConverter.readSchemaFromFile( new Path(path.split(",").head), conf, - sqlContext.conf.isParquetBinaryAsString) - + sqlContext.conf.isParquetBinaryAsString, + sqlContext.conf.isParquetINT96AsTimestamp) lazy val attributeMap = AttributeMap(output.map(o => o -> o)) override def newInstance() = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index fd63ad8144..3fb1cc4105 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -83,7 +83,8 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging { // TODO: Why it can be null? if (schema == null) { log.debug("falling back to Parquet read schema") - schema = ParquetTypesConverter.convertToAttributes(parquetSchema, false) + schema = ParquetTypesConverter.convertToAttributes( + parquetSchema, false, true) } log.debug(s"list of attributes that will be read: $schema") new RowRecordMaterializer(parquetSchema, schema) @@ -184,12 +185,12 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { case t @ StructType(_) => writeStruct( t, value.asInstanceOf[CatalystConverter.StructScalaType[_]]) - case _ => writePrimitive(schema.asInstanceOf[PrimitiveType], value) + case _ => writePrimitive(schema.asInstanceOf[NativeType], value) } } } - private[parquet] def writePrimitive(schema: PrimitiveType, value: Any): Unit = { + private[parquet] def writePrimitive(schema: DataType, value: Any): Unit = { if (value != null) { schema match { case StringType => writer.addBinary( @@ -202,6 +203,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { case IntegerType => writer.addInteger(value.asInstanceOf[Int]) case ShortType => writer.addInteger(value.asInstanceOf[Short]) case LongType => writer.addLong(value.asInstanceOf[Long]) + case TimestampType => writeTimestamp(value.asInstanceOf[java.sql.Timestamp]) case ByteType => writer.addInteger(value.asInstanceOf[Byte]) case DoubleType => writer.addDouble(value.asInstanceOf[Double]) case FloatType => writer.addFloat(value.asInstanceOf[Float]) @@ -307,6 +309,10 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { writer.addBinary(Binary.fromByteArray(scratchBytes, 0, numBytes)) } + private[parquet] def writeTimestamp(ts: java.sql.Timestamp): Unit = { + val binaryNanoTime = CatalystTimestampConverter.convertFromTimestamp(ts) + writer.addBinary(binaryNanoTime) + } } // Optimized for non-nested rows @@ -351,6 +357,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport { case DoubleType => writer.addDouble(record.getDouble(index)) case FloatType => writer.addFloat(record.getFloat(index)) case BooleanType => writer.addBoolean(record.getBoolean(index)) + case TimestampType => writeTimestamp(record(index).asInstanceOf[java.sql.Timestamp]) case d: DecimalType => if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) { sys.error(s"Unsupported datatype $d, cannot write to consumer") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala index d5993656e0..e4a10aa2ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.mapreduce.Job import org.apache.spark.sql.test.TestSQLContext import parquet.example.data.{GroupWriter, Group} -import parquet.example.data.simple.SimpleGroup +import parquet.example.data.simple.{NanoTime, SimpleGroup} import parquet.hadoop.{ParquetReader, ParquetFileReader, ParquetWriter} import parquet.hadoop.api.WriteSupport import parquet.hadoop.api.WriteSupport.WriteContext @@ -63,6 +63,7 @@ private[sql] object ParquetTestData { optional int64 mylong; optional float myfloat; optional double mydouble; + optional int96 mytimestamp; }""" // field names for test assertion error messages @@ -72,7 +73,8 @@ private[sql] object ParquetTestData { "mystring:String", "mylong:Long", "myfloat:Float", - "mydouble:Double" + "mydouble:Double", + "mytimestamp:Timestamp" ) val subTestSchema = @@ -98,6 +100,7 @@ private[sql] object ParquetTestData { optional int64 myoptlong; optional float myoptfloat; optional double myoptdouble; + optional int96 mytimestamp; } """ @@ -236,6 +239,7 @@ private[sql] object ParquetTestData { record.add(3, i.toLong << 33) record.add(4, 2.5F) record.add(5, 4.5D) + record.add(6, new NanoTime(1,2)) writer.write(record) } writer.close() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index 6d8c682ccc..f1d4ff2387 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -54,7 +54,8 @@ private[parquet] object ParquetTypesConverter extends Logging { def toPrimitiveDataType( parquetType: ParquetPrimitiveType, - binaryAsString: Boolean): DataType = { + binaryAsString: Boolean, + int96AsTimestamp: Boolean): DataType = { val originalType = parquetType.getOriginalType val decimalInfo = parquetType.getDecimalMetadata parquetType.getPrimitiveTypeName match { @@ -66,6 +67,7 @@ private[parquet] object ParquetTypesConverter extends Logging { case ParquetPrimitiveTypeName.FLOAT => FloatType case ParquetPrimitiveTypeName.INT32 => IntegerType case ParquetPrimitiveTypeName.INT64 => LongType + case ParquetPrimitiveTypeName.INT96 if int96AsTimestamp => TimestampType case ParquetPrimitiveTypeName.INT96 => // TODO: add BigInteger type? TODO(andre) use DecimalType instead???? sys.error("Potential loss of precision: cannot convert INT96") @@ -103,7 +105,9 @@ private[parquet] object ParquetTypesConverter extends Logging { * @param parquetType The type to convert. * @return The corresponding Catalyst type. */ - def toDataType(parquetType: ParquetType, isBinaryAsString: Boolean): DataType = { + def toDataType(parquetType: ParquetType, + isBinaryAsString: Boolean, + isInt96AsTimestamp: Boolean): DataType = { def correspondsToMap(groupType: ParquetGroupType): Boolean = { if (groupType.getFieldCount != 1 || groupType.getFields.apply(0).isPrimitive) { false @@ -125,7 +129,7 @@ private[parquet] object ParquetTypesConverter extends Logging { } if (parquetType.isPrimitive) { - toPrimitiveDataType(parquetType.asPrimitiveType, isBinaryAsString) + toPrimitiveDataType(parquetType.asPrimitiveType, isBinaryAsString, isInt96AsTimestamp) } else { val groupType = parquetType.asGroupType() parquetType.getOriginalType match { @@ -137,9 +141,12 @@ private[parquet] object ParquetTypesConverter extends Logging { if (field.getName == CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME) { val bag = field.asGroupType() assert(bag.getFieldCount == 1) - ArrayType(toDataType(bag.getFields.apply(0), isBinaryAsString), containsNull = true) + ArrayType( + toDataType(bag.getFields.apply(0), isBinaryAsString, isInt96AsTimestamp), + containsNull = true) } else { - ArrayType(toDataType(field, isBinaryAsString), containsNull = false) + ArrayType( + toDataType(field, isBinaryAsString, isInt96AsTimestamp), containsNull = false) } } case ParquetOriginalType.MAP => { @@ -152,8 +159,10 @@ private[parquet] object ParquetTypesConverter extends Logging { "Parquet Map type malformatted: nested group should have 2 (key, value) fields!") assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED) - val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString) - val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString) + val keyType = + toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString, isInt96AsTimestamp) + val valueType = + toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString, isInt96AsTimestamp) MapType(keyType, valueType, keyValueGroup.getFields.apply(1).getRepetition != Repetition.REQUIRED) } @@ -163,8 +172,10 @@ private[parquet] object ParquetTypesConverter extends Logging { val keyValueGroup = groupType.getFields.apply(0).asGroupType() assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED) - val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString) - val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString) + val keyType = + toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString, isInt96AsTimestamp) + val valueType = + toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString, isInt96AsTimestamp) MapType(keyType, valueType, keyValueGroup.getFields.apply(1).getRepetition != Repetition.REQUIRED) } else if (correspondsToArray(groupType)) { // ArrayType @@ -172,16 +183,19 @@ private[parquet] object ParquetTypesConverter extends Logging { if (field.getName == CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME) { val bag = field.asGroupType() assert(bag.getFieldCount == 1) - ArrayType(toDataType(bag.getFields.apply(0), isBinaryAsString), containsNull = true) + ArrayType( + toDataType(bag.getFields.apply(0), isBinaryAsString, isInt96AsTimestamp), + containsNull = true) } else { - ArrayType(toDataType(field, isBinaryAsString), containsNull = false) + ArrayType( + toDataType(field, isBinaryAsString, isInt96AsTimestamp), containsNull = false) } } else { // everything else: StructType val fields = groupType .getFields .map(ptype => new StructField( ptype.getName, - toDataType(ptype, isBinaryAsString), + toDataType(ptype, isBinaryAsString, isInt96AsTimestamp), ptype.getRepetition != Repetition.REQUIRED)) StructType(fields) } @@ -210,6 +224,7 @@ private[parquet] object ParquetTypesConverter extends Logging { case ShortType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32)) case ByteType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32)) case LongType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT64)) + case TimestampType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT96)) case DecimalType.Fixed(precision, scale) if precision <= 18 => // TODO: for now, our writer only supports decimals that fit in a Long Some(ParquetTypeInfo(ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, @@ -345,7 +360,9 @@ private[parquet] object ParquetTypesConverter extends Logging { } } - def convertToAttributes(parquetSchema: ParquetType, isBinaryAsString: Boolean): Seq[Attribute] = { + def convertToAttributes(parquetSchema: ParquetType, + isBinaryAsString: Boolean, + isInt96AsTimestamp: Boolean): Seq[Attribute] = { parquetSchema .asGroupType() .getFields @@ -353,7 +370,7 @@ private[parquet] object ParquetTypesConverter extends Logging { field => new AttributeReference( field.getName, - toDataType(field, isBinaryAsString), + toDataType(field, isBinaryAsString, isInt96AsTimestamp), field.getRepetition != Repetition.REQUIRED)()) } @@ -476,7 +493,8 @@ private[parquet] object ParquetTypesConverter extends Logging { def readSchemaFromFile( origPath: Path, conf: Option[Configuration], - isBinaryAsString: Boolean): Seq[Attribute] = { + isBinaryAsString: Boolean, + isInt96AsTimestamp: Boolean): Seq[Attribute] = { val keyValueMetadata: java.util.Map[String, String] = readMetaData(origPath, conf) .getFileMetaData @@ -485,7 +503,9 @@ private[parquet] object ParquetTypesConverter extends Logging { convertFromString(keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY)) } else { val attributes = convertToAttributes( - readMetaData(origPath, conf).getFileMetaData.getSchema, isBinaryAsString) + readMetaData(origPath, conf).getFileMetaData.getSchema, + isBinaryAsString, + isInt96AsTimestamp) log.info(s"Falling back to schema conversion from Parquet types; result: $attributes") attributes } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 1e794cad73..179c0d6b22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -136,7 +136,8 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext) ParquetTypesConverter.readSchemaFromFile( partitions.head.files.head.getPath, Some(sparkContext.hadoopConfiguration), - sqlContext.conf.isParquetBinaryAsString)) + sqlContext.conf.isParquetBinaryAsString, + sqlContext.conf.isParquetINT96AsTimestamp)) val dataIncludesKey = partitionKeys.headOption.map(dataSchema.fieldNames.contains(_)).getOrElse(true) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala new file mode 100644 index 0000000000..8871616844 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parquet.timestamp + +import java.nio.{ByteBuffer, ByteOrder} + +import parquet.Preconditions +import parquet.io.api.{Binary, RecordConsumer} + +private[parquet] class NanoTime extends Serializable { + private var julianDay = 0 + private var timeOfDayNanos = 0L + + def set(julianDay: Int, timeOfDayNanos: Long) = { + this.julianDay = julianDay + this.timeOfDayNanos = timeOfDayNanos + this + } + + def getJulianDay: Int = julianDay + + def getTimeOfDayNanos: Long = timeOfDayNanos + + def toBinary: Binary = { + val buf = ByteBuffer.allocate(12) + buf.order(ByteOrder.LITTLE_ENDIAN) + buf.putLong(timeOfDayNanos) + buf.putInt(julianDay) + buf.flip() + Binary.fromByteBuffer(buf) + } + + def writeValue(recordConsumer: RecordConsumer) { + recordConsumer.addBinary(toBinary) + } + + override def toString = + "NanoTime{julianDay=" + julianDay + ", timeOfDayNanos=" + timeOfDayNanos + "}" +} + +object NanoTime { + def fromBinary(bytes: Binary): NanoTime = { + Preconditions.checkArgument(bytes.length() == 12, "Must be 12 bytes") + val buf = bytes.toByteBuffer + buf.order(ByteOrder.LITTLE_ENDIAN) + val timeOfDayNanos = buf.getLong + val julianDay = buf.getInt + new NanoTime().set(julianDay, timeOfDayNanos) + } + + def apply(julianDay: Int, timeOfDayNanos: Long): NanoTime = { + new NanoTime().set(julianDay, timeOfDayNanos) + } +} -- GitLab