diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index f8e3f1a79082e7e8a6d52caaa334d6ebf502577c..56adc857d4ce0fcd6cc9632631d3b14ffab0816b 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -18,7 +18,7 @@ package org.apache.spark.api.r import java.io.{DataInputStream, DataOutputStream} -import java.sql.{Date, Time} +import java.sql.{Timestamp, Date, Time} import scala.collection.JavaConversions._ @@ -107,9 +107,12 @@ private[spark] object SerDe { Date.valueOf(readString(in)) } - def readTime(in: DataInputStream): Time = { - val t = in.readDouble() - new Time((t * 1000L).toLong) + def readTime(in: DataInputStream): Timestamp = { + val seconds = in.readDouble() + val sec = Math.floor(seconds).toLong + val t = new Timestamp(sec * 1000L) + t.setNanos(((seconds - sec) * 1e9).toInt) + t } def readBytesArr(in: DataInputStream): Array[Array[Byte]] = { @@ -227,6 +230,9 @@ private[spark] object SerDe { case "java.sql.Time" => writeType(dos, "time") writeTime(dos, value.asInstanceOf[Time]) + case "java.sql.Timestamp" => + writeType(dos, "time") + writeTime(dos, value.asInstanceOf[Timestamp]) case "[B" => writeType(dos, "raw") writeBytes(dos, value.asInstanceOf[Array[Byte]]) @@ -289,6 +295,9 @@ private[spark] object SerDe { out.writeDouble(value.getTime.toDouble / 1000.0) } + def writeTime(out: DataOutputStream, value: Timestamp): Unit = { + out.writeDouble((value.getTime / 1000).toDouble + value.getNanos.toDouble / 1e9) + } // NOTE: Only works for ASCII right now def writeString(out: DataOutputStream, value: String): Unit = { diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index b6ec6137c918055376b988e2e1a3bd78a2297f01..8f286b631f4f0ecc6e1b60f18e46276ec54c0413 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -19,6 +19,7 @@ import sys import decimal import time import datetime +import calendar import keyword import warnings import json @@ -654,6 +655,8 @@ def _need_python_to_sql_conversion(dataType): _need_python_to_sql_conversion(dataType.valueType) elif isinstance(dataType, UserDefinedType): return True + elif isinstance(dataType, TimestampType): + return True else: return False @@ -707,6 +710,14 @@ def _python_to_sql_converter(dataType): return lambda m: dict([(key_converter(k), value_converter(v)) for k, v in m.items()]) elif isinstance(dataType, UserDefinedType): return lambda obj: dataType.serialize(obj) + elif isinstance(dataType, TimestampType): + + def to_posix_timstamp(dt): + if dt.tzinfo is None: + return int(time.mktime(dt.timetuple()) * 1e7 + dt.microsecond * 10) + else: + return int(calendar.timegm(dt.utctimetuple()) * 1e7 + dt.microsecond * 10) + return to_posix_timstamp else: raise ValueError("Unexpected type %r" % dataType) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java b/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java index d138b43a3482b53b3a09c4682b88b2bf60682216..6584882a62fd18f39110f424d4af6224074388f0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java @@ -19,6 +19,7 @@ package org.apache.spark.sql; import java.math.BigDecimal; import java.sql.Date; +import java.sql.Timestamp; import java.util.List; import scala.collection.Seq; @@ -103,6 +104,11 @@ public abstract class BaseRow implements Row { throw new UnsupportedOperationException(); } + @Override + public Timestamp getTimestamp(int i) { + throw new UnsupportedOperationException(); + } + @Override public <T> Seq<T> getSeq(int i) { throw new UnsupportedOperationException(); diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala index 0d460b634d9b0f52551bf093979bc5806e2b6cd6..8aaf5d7d891544fea26cda4a61bd398986e5f8af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala @@ -260,9 +260,15 @@ trait Row extends Serializable { * * @throws ClassCastException when data type does not match. */ - // TODO(davies): This is not the right default implementation, we use Int as Date internally def getDate(i: Int): java.sql.Date = apply(i).asInstanceOf[java.sql.Date] + /** + * Returns the value at position i of date type as java.sql.Timestamp. + * + * @throws ClassCastException when data type does not match. + */ + def getTimestamp(i: Int): java.sql.Timestamp = apply(i).asInstanceOf[java.sql.Timestamp] + /** * Returns the value at position i of array type as a Scala Seq. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index 2e7b4c236d8f80da222dc5fb1660765f45c19700..beb82dbc0864282b7af35bdcd2a2c825a9aaad72 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst import java.lang.{Iterable => JavaIterable} import java.math.{BigDecimal => JavaBigDecimal} -import java.sql.Date +import java.sql.{Timestamp, Date} import java.util.{Map => JavaMap} import javax.annotation.Nullable @@ -58,6 +58,7 @@ object CatalystTypeConverters { case structType: StructType => StructConverter(structType) case StringType => StringConverter case DateType => DateConverter + case TimestampType => TimestampConverter case dt: DecimalType => BigDecimalConverter case BooleanType => BooleanConverter case ByteType => ByteConverter @@ -274,6 +275,15 @@ object CatalystTypeConverters { override def toScalaImpl(row: Row, column: Int): Date = toScala(row.getInt(column)) } + private object TimestampConverter extends CatalystTypeConverter[Timestamp, Timestamp, Any] { + override def toCatalystImpl(scalaValue: Timestamp): Long = + DateUtils.fromJavaTimestamp(scalaValue) + override def toScala(catalystValue: Any): Timestamp = + if (catalystValue == null) null + else DateUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long]) + override def toScalaImpl(row: Row, column: Int): Timestamp = toScala(row.getLong(column)) + } + private object BigDecimalConverter extends CatalystTypeConverter[Any, JavaBigDecimal, Decimal] { override def toCatalystImpl(scalaValue: Any): Decimal = scalaValue match { case d: BigDecimal => Decimal(d) @@ -367,6 +377,7 @@ object CatalystTypeConverters { def convertToCatalyst(a: Any): Any = a match { case s: String => StringConverter.toCatalyst(s) case d: Date => DateConverter.toCatalyst(d) + case t: Timestamp => TimestampConverter.toCatalyst(t) case d: BigDecimal => BigDecimalConverter.toCatalyst(d) case d: JavaBigDecimal => BigDecimalConverter.toCatalyst(d) case seq: Seq[Any] => seq.map(convertToCatalyst) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 18102d1acb5b3a267ae07970419b51811ee72805..8d93957fea2fc16beefb09f3268797064bae24c6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -113,7 +113,8 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToString(from: DataType): Any => Any = from match { case BinaryType => buildCast[Array[Byte]](_, UTF8String(_)) case DateType => buildCast[Int](_, d => UTF8String(DateUtils.toString(d))) - case TimestampType => buildCast[Timestamp](_, t => UTF8String(timestampToString(t))) + case TimestampType => buildCast[Long](_, + t => UTF8String(timestampToString(DateUtils.toJavaTimestamp(t)))) case _ => buildCast[Any](_, o => UTF8String(o.toString)) } @@ -127,7 +128,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case StringType => buildCast[UTF8String](_, _.length() != 0) case TimestampType => - buildCast[Timestamp](_, t => t.getTime() != 0 || t.getNanos() != 0) + buildCast[Long](_, t => t != 0) case DateType => // Hive would return null when cast from date to boolean buildCast[Int](_, d => null) @@ -158,20 +159,21 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w if (periodIdx != -1 && n.length() - periodIdx > 9) { n = n.substring(0, periodIdx + 10) } - try Timestamp.valueOf(n) catch { case _: java.lang.IllegalArgumentException => null } + try DateUtils.fromJavaTimestamp(Timestamp.valueOf(n)) + catch { case _: java.lang.IllegalArgumentException => null } }) case BooleanType => - buildCast[Boolean](_, b => new Timestamp(if (b) 1 else 0)) + buildCast[Boolean](_, b => (if (b) 1L else 0)) case LongType => - buildCast[Long](_, l => new Timestamp(l)) + buildCast[Long](_, l => longToTimestamp(l)) case IntegerType => - buildCast[Int](_, i => new Timestamp(i)) + buildCast[Int](_, i => longToTimestamp(i.toLong)) case ShortType => - buildCast[Short](_, s => new Timestamp(s)) + buildCast[Short](_, s => longToTimestamp(s.toLong)) case ByteType => - buildCast[Byte](_, b => new Timestamp(b)) + buildCast[Byte](_, b => longToTimestamp(b.toLong)) case DateType => - buildCast[Int](_, d => new Timestamp(DateUtils.toJavaDate(d).getTime)) + buildCast[Int](_, d => DateUtils.toMillisSinceEpoch(d) * 10000) // TimestampWritable.decimalToTimestamp case DecimalType() => buildCast[Decimal](_, d => decimalToTimestamp(d)) @@ -191,25 +193,17 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w }) } - private[this] def decimalToTimestamp(d: Decimal) = { - val seconds = Math.floor(d.toDouble).toLong - val bd = (d.toBigDecimal - seconds) * 1000000000 - val nanos = bd.intValue() - - val millis = seconds * 1000 - val t = new Timestamp(millis) - - // remaining fractional portion as nanos - t.setNanos(nanos) - t + private[this] def decimalToTimestamp(d: Decimal): Long = { + (d.toBigDecimal * 10000000L).longValue() } - // Timestamp to long, converting milliseconds to seconds - private[this] def timestampToLong(ts: Timestamp) = Math.floor(ts.getTime / 1000.0).toLong - - private[this] def timestampToDouble(ts: Timestamp) = { - // First part is the seconds since the beginning of time, followed by nanosecs. - Math.floor(ts.getTime / 1000.0).toLong + ts.getNanos.toDouble / 1000000000 + // converting milliseconds to 100ns + private[this] def longToTimestamp(t: Long): Long = t * 10000L + // converting 100ns to seconds + private[this] def timestampToLong(ts: Long): Long = math.floor(ts.toDouble / 10000000L).toLong + // converting 100ns to seconds in double + private[this] def timestampToDouble(ts: Long): Double = { + ts / 10000000.0 } // Converts Timestamp to string according to Hive TimestampWritable convention @@ -234,7 +228,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case TimestampType => // throw valid precision more than seconds, according to Hive. // Timestamp.nanos is in 0 to 999,999,999, no more than a second. - buildCast[Timestamp](_, t => DateUtils.millisToDays(t.getTime)) + buildCast[Long](_, t => DateUtils.millisToDays(t / 10000L)) // Hive throws this exception as a Semantic Exception // It is never possible to compare result when hive return with exception, // so we can return null @@ -253,7 +247,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case DateType => buildCast[Int](_, d => null) case TimestampType => - buildCast[Timestamp](_, t => timestampToLong(t)) + buildCast[Long](_, t => timestampToLong(t)) case x: NumericType => b => x.numeric.asInstanceOf[Numeric[Any]].toLong(b) } @@ -269,7 +263,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case DateType => buildCast[Int](_, d => null) case TimestampType => - buildCast[Timestamp](_, t => timestampToLong(t).toInt) + buildCast[Long](_, t => timestampToLong(t).toInt) case x: NumericType => b => x.numeric.asInstanceOf[Numeric[Any]].toInt(b) } @@ -285,7 +279,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case DateType => buildCast[Int](_, d => null) case TimestampType => - buildCast[Timestamp](_, t => timestampToLong(t).toShort) + buildCast[Long](_, t => timestampToLong(t).toShort) case x: NumericType => b => x.numeric.asInstanceOf[Numeric[Any]].toInt(b).toShort } @@ -301,7 +295,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case DateType => buildCast[Int](_, d => null) case TimestampType => - buildCast[Timestamp](_, t => timestampToLong(t).toByte) + buildCast[Long](_, t => timestampToLong(t).toByte) case x: NumericType => b => x.numeric.asInstanceOf[Numeric[Any]].toInt(b).toByte } @@ -334,7 +328,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w buildCast[Int](_, d => null) // date can't cast to decimal in Hive case TimestampType => // Note that we lose precision here. - buildCast[Timestamp](_, t => changePrecision(Decimal(timestampToDouble(t)), target)) + buildCast[Long](_, t => changePrecision(Decimal(timestampToDouble(t)), target)) case DecimalType() => b => changePrecision(b.asInstanceOf[Decimal].clone(), target) case LongType => @@ -358,7 +352,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case DateType => buildCast[Int](_, d => null) case TimestampType => - buildCast[Timestamp](_, t => timestampToDouble(t)) + buildCast[Long](_, t => timestampToDouble(t)) case x: NumericType => b => x.numeric.asInstanceOf[Numeric[Any]].toDouble(b) } @@ -374,7 +368,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case DateType => buildCast[Int](_, d => null) case TimestampType => - buildCast[Timestamp](_, t => timestampToDouble(t).toFloat) + buildCast[Long](_, t => timestampToDouble(t).toFloat) case x: NumericType => b => x.numeric.asInstanceOf[Numeric[Any]].toFloat(b) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala index aa4099e4d7bf9284314a51eac6e5abfc97226a01..2c884517d62a7467efa638c24590dcf08c7ebfb4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala @@ -203,6 +203,7 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR case BooleanType => new MutableBoolean case LongType => new MutableLong case DateType => new MutableInt // We use INT for DATE internally + case TimestampType => new MutableLong // We use Long for Timestamp internally case _ => new MutableAny }.toArray) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index e95682f952a7b327330f88072779394905f02691..80aa8fa0561467e0eef3e42e1ae734a63fe6b1f6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -122,7 +122,7 @@ class CodeGenContext { case BinaryType => "byte[]" case StringType => stringType case DateType => "int" - case TimestampType => "java.sql.Timestamp" + case TimestampType => "long" case dt: OpenHashSetUDT if dt.elementType == IntegerType => classOf[IntegerHashSet].getName case dt: OpenHashSetUDT if dt.elementType == LongType => classOf[LongHashSet].getName case _ => "Object" @@ -140,6 +140,7 @@ class CodeGenContext { case FloatType => "Float" case BooleanType => "Boolean" case DateType => "Integer" + case TimestampType => "Long" case _ => javaType(dt) } @@ -155,6 +156,7 @@ class CodeGenContext { case DoubleType => "-1.0" case IntegerType => "-1" case DateType => "-1" + case TimestampType => "-1L" case _ => "null" } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala index 7caf4aaab88bb0e4ade08eafb0dac2e26b821964..274429cd1c55f80e85712ae1847f62347ce2f41a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala @@ -73,7 +73,9 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] { val specificAccessorFunctions = ctx.nativeTypes.map { dataType => val cases = expressions.zipWithIndex.map { - case (e, i) if e.dataType == dataType => + case (e, i) if e.dataType == dataType + || dataType == IntegerType && e.dataType == DateType + || dataType == LongType && e.dataType == TimestampType => s"case $i: return c$i;" case _ => "" }.mkString("\n ") @@ -96,7 +98,9 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] { val specificMutatorFunctions = ctx.nativeTypes.map { dataType => val cases = expressions.zipWithIndex.map { - case (e, i) if e.dataType == dataType => + case (e, i) if e.dataType == dataType + || dataType == IntegerType && e.dataType == DateType + || dataType == LongType && e.dataType == TimestampType => s"case $i: { c$i = value; return; }" case _ => "" }.mkString("\n") @@ -119,7 +123,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] { val nonNull = e.dataType match { case BooleanType => s"$col ? 0 : 1" case ByteType | ShortType | IntegerType | DateType => s"$col" - case LongType => s"$col ^ ($col >>> 32)" + case LongType | TimestampType => s"$col ^ ($col >>> 32)" case FloatType => s"Float.floatToIntBits($col)" case DoubleType => s"(int)(Double.doubleToLongBits($col) ^ (Double.doubleToLongBits($col) >>> 32))" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index 297b35b4da94c077054ebb4e62ccea6a85d08348..833c08a293dcb86d2a7a2010f00954dd957bede7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -37,7 +37,7 @@ object Literal { case d: BigDecimal => Literal(Decimal(d), DecimalType.Unlimited) case d: java.math.BigDecimal => Literal(Decimal(d), DecimalType.Unlimited) case d: Decimal => Literal(d, DecimalType.Unlimited) - case t: Timestamp => Literal(t, TimestampType) + case t: Timestamp => Literal(DateUtils.fromJavaTimestamp(t), TimestampType) case d: Date => Literal(DateUtils.fromJavaDate(d), DateType) case a: Array[Byte] => Literal(a, BinaryType) case null => Literal(null, NullType) @@ -100,7 +100,7 @@ case class Literal protected (value: Any, dataType: DataType) extends LeafExpres ev.isNull = "false" ev.primitive = value.toString "" - case FloatType => // This must go before NumericType + case FloatType => val v = value.asInstanceOf[Float] if (v.isNaN || v.isInfinite) { super.genCode(ctx, ev) @@ -109,7 +109,7 @@ case class Literal protected (value: Any, dataType: DataType) extends LeafExpres ev.primitive = s"${value}f" "" } - case DoubleType => // This must go before NumericType + case DoubleType => val v = value.asInstanceOf[Double] if (v.isNaN || v.isInfinite) { super.genCode(ctx, ev) @@ -118,15 +118,18 @@ case class Literal protected (value: Any, dataType: DataType) extends LeafExpres ev.primitive = s"${value}" "" } - - case ByteType | ShortType => // This must go before NumericType + case ByteType | ShortType => ev.isNull = "false" ev.primitive = s"(${ctx.javaType(dataType)})$value" "" - case dt: NumericType if !dt.isInstanceOf[DecimalType] => + case IntegerType | DateType => ev.isNull = "false" ev.primitive = value.toString "" + case TimestampType | LongType => + ev.isNull = "false" + ev.primitive = s"${value}L" + "" // eval() version may be faster for non-primitive types case other => super.genCode(ctx, ev) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 3cbdfdfb13847ba533111ba7bcda5ccde067259f..2c49352874fc3d98edffebd9f8f287e45ba5038c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -254,9 +254,9 @@ abstract class BinaryComparison extends BinaryExpression with Predicate { case dt: NumericType if ctx.isNativeType(dt) => defineCodeGen (ctx, ev, { (c1, c3) => s"$c1 $symbol $c3" }) - case TimestampType => - // java.sql.Timestamp does not have compare() - super.genCode(ctx, ev) + case DateType | TimestampType => defineCodeGen (ctx, ev, { + (c1, c3) => s"$c1 $symbol $c3" + }) case other => defineCodeGen (ctx, ev, { (c1, c2) => s"$c1.compare($c2) $symbol 0" }) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala index ad649acf536f929fb119a1e9fe5ed1d209d87847..5cadc141af1df98e04ae6e00bef74c15a69a08c9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.util -import java.sql.Date +import java.sql.{Timestamp, Date} import java.text.SimpleDateFormat import java.util.{Calendar, TimeZone} @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.Cast */ object DateUtils { private val MILLIS_PER_DAY = 86400000 + private val HUNDRED_NANOS_PER_SECOND = 10000000L // Java TimeZone has no mention of thread safety. Use thread local instance to be safe. private val LOCAL_TIMEZONE = new ThreadLocal[TimeZone] { @@ -45,17 +46,17 @@ object DateUtils { ((millisLocal + LOCAL_TIMEZONE.get().getOffset(millisLocal)) / MILLIS_PER_DAY).toInt } - private def toMillisSinceEpoch(days: Int): Long = { + def toMillisSinceEpoch(days: Int): Long = { val millisUtc = days.toLong * MILLIS_PER_DAY millisUtc - LOCAL_TIMEZONE.get().getOffset(millisUtc) } - def fromJavaDate(date: java.sql.Date): Int = { + def fromJavaDate(date: Date): Int = { javaDateToDays(date) } - def toJavaDate(daysSinceEpoch: Int): java.sql.Date = { - new java.sql.Date(toMillisSinceEpoch(daysSinceEpoch)) + def toJavaDate(daysSinceEpoch: Int): Date = { + new Date(toMillisSinceEpoch(daysSinceEpoch)) } def toString(days: Int): String = Cast.threadLocalDateFormat.get.format(toJavaDate(days)) @@ -64,9 +65,9 @@ object DateUtils { if (!s.contains('T')) { // JDBC escape string if (s.contains(' ')) { - java.sql.Timestamp.valueOf(s) + Timestamp.valueOf(s) } else { - java.sql.Date.valueOf(s) + Date.valueOf(s) } } else if (s.endsWith("Z")) { // this is zero timezone of ISO8601 @@ -87,4 +88,33 @@ object DateUtils { ISO8601GMT.parse(s) } } + + /** + * Return a java.sql.Timestamp from number of 100ns since epoch + */ + def toJavaTimestamp(num100ns: Long): Timestamp = { + // setNanos() will overwrite the millisecond part, so the milliseconds should be + // cut off at seconds + var seconds = num100ns / HUNDRED_NANOS_PER_SECOND + var nanos = num100ns % HUNDRED_NANOS_PER_SECOND + // setNanos() can not accept negative value + if (nanos < 0) { + nanos += HUNDRED_NANOS_PER_SECOND + seconds -= 1 + } + val t = new Timestamp(seconds * 1000) + t.setNanos(nanos.toInt * 100) + t + } + + /** + * Return the number of 100ns since epoch from java.sql.Timestamp. + */ + def fromJavaTimestamp(t: Timestamp): Long = { + if (t != null) { + t.getTime() * 10000L + (t.getNanos().toLong / 100) % 10000L + } else { + 0L + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala index aebabfc4759253b06c15b53bae904db0273c9972..a558641fcfed74ffbd4ed962f1382b1b5fda55c2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.types -import java.sql.Timestamp - import scala.math.Ordering import scala.reflect.runtime.universe.typeTag @@ -38,18 +36,16 @@ class TimestampType private() extends AtomicType { // The companion object and this class is separated so the companion object also subclasses // this type. Otherwise, the companion object would be of type "TimestampType$" in byte code. // Defined with a private constructor so the companion object is the only possible instantiation. - private[sql] type InternalType = Timestamp + private[sql] type InternalType = Long @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] } - private[sql] val ordering = new Ordering[InternalType] { - def compare(x: Timestamp, y: Timestamp): Int = x.compareTo(y) - } + private[sql] val ordering = implicitly[Ordering[InternalType]] /** * The default size of a value of the TimestampType is 12 bytes. */ - override def defaultSize: Int = 12 + override def defaultSize: Int = 8 private[spark] override def asNullable: TimestampType = this } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala index 5bc7c30eee1b60ea64c10a6b07b9832c8d135421..3aca94db3bd8f8a2843ba3972204c231767f694a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions import java.sql.{Timestamp, Date} import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.types._ /** @@ -137,7 +138,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(cast(cast(sd, DateType), StringType), sd) checkEvaluation(cast(cast(d, StringType), DateType), 0) checkEvaluation(cast(cast(nts, TimestampType), StringType), nts) - checkEvaluation(cast(cast(ts, StringType), TimestampType), ts) + checkEvaluation(cast(cast(ts, StringType), TimestampType), DateUtils.fromJavaTimestamp(ts)) // all convert to string type to check checkEvaluation(cast(cast(cast(nts, TimestampType), DateType), StringType), sd) @@ -269,9 +270,9 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(cast(ts, LongType), 15.toLong) checkEvaluation(cast(ts, FloatType), 15.002f) checkEvaluation(cast(ts, DoubleType), 15.002) - checkEvaluation(cast(cast(tss, ShortType), TimestampType), ts) - checkEvaluation(cast(cast(tss, IntegerType), TimestampType), ts) - checkEvaluation(cast(cast(tss, LongType), TimestampType), ts) + checkEvaluation(cast(cast(tss, ShortType), TimestampType), DateUtils.fromJavaTimestamp(ts)) + checkEvaluation(cast(cast(tss, IntegerType), TimestampType), DateUtils.fromJavaTimestamp(ts)) + checkEvaluation(cast(cast(tss, LongType), TimestampType), DateUtils.fromJavaTimestamp(ts)) checkEvaluation( cast(cast(millis.toFloat / 1000, TimestampType), FloatType), millis.toFloat / 1000) @@ -283,7 +284,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { Decimal(1)) // A test for higher precision than millis - checkEvaluation(cast(cast(0.00000001, TimestampType), DoubleType), 0.00000001) + checkEvaluation(cast(cast(0.0000001, TimestampType), DoubleType), 0.0000001) checkEvaluation(cast(Double.NaN, TimestampType), null) checkEvaluation(cast(1.0 / 0.0, TimestampType), null) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..a4245545ffc1dd8c6249b0fba2ecac6cb4f24999 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import java.sql.Timestamp + +import org.apache.spark.SparkFunSuite + + +class DateUtilsSuite extends SparkFunSuite { + + test("timestamp") { + val now = new Timestamp(System.currentTimeMillis()) + now.setNanos(100) + val ns = DateUtils.fromJavaTimestamp(now) + assert(ns % 10000000L == 1) + assert(DateUtils.toJavaTimestamp(ns) == now) + + List(-111111111111L, -1L, 0, 1L, 111111111111L).foreach { t => + val ts = DateUtils.toJavaTimestamp(t) + assert(DateUtils.fromJavaTimestamp(ts) == t) + assert(DateUtils.toJavaTimestamp(DateUtils.fromJavaTimestamp(ts)) == ts) + } + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala index 261c4fcad24aa7d95f9141a33a1b117596e83796..077c0ad70ac4fc1a8bce993ce12525150ae08cc7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala @@ -190,7 +190,7 @@ class DataTypeSuite extends SparkFunSuite { checkDefaultSize(DecimalType(10, 5), 4096) checkDefaultSize(DecimalType.Unlimited, 4096) checkDefaultSize(DateType, 4) - checkDefaultSize(TimestampType, 12) + checkDefaultSize(TimestampType, 8) checkDefaultSize(StringType, 4096) checkDefaultSize(BinaryType, 4096) checkDefaultSize(ArrayType(DoubleType, true), 800) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala index b0f983c1806732109ae88fac2b2ffaaefdfc3e1c..83881a36870905f18d94c823179900ac9971ddfd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala @@ -17,10 +17,8 @@ package org.apache.spark.sql.columnar -import java.sql.Timestamp - import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.expressions.{AttributeMap, Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference} import org.apache.spark.sql.types._ private[sql] class ColumnStatisticsSchema(a: Attribute) extends Serializable { @@ -234,22 +232,7 @@ private[sql] class StringColumnStats extends ColumnStats { private[sql] class DateColumnStats extends IntColumnStats -private[sql] class TimestampColumnStats extends ColumnStats { - protected var upper: Timestamp = null - protected var lower: Timestamp = null - - override def gatherStats(row: Row, ordinal: Int): Unit = { - super.gatherStats(row, ordinal) - if (!row.isNullAt(ordinal)) { - val value = row(ordinal).asInstanceOf[Timestamp] - if (upper == null || value.compareTo(upper) > 0) upper = value - if (lower == null || value.compareTo(lower) < 0) lower = value - sizeInBytes += TIMESTAMP.defaultSize - } - } - - override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes) -} +private[sql] class TimestampColumnStats extends LongColumnStats private[sql] class BinaryColumnStats extends ColumnStats { override def gatherStats(row: Row, ordinal: Int): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala index 20be5ca9d0046b34933208a50003ac8c233e62e1..c9c4d630fb5f497a710b88d52a0f0439228dd1d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.columnar import java.nio.ByteBuffer -import java.sql.Timestamp import scala.reflect.runtime.universe.TypeTag @@ -355,22 +354,20 @@ private[sql] object DATE extends NativeColumnType(DateType, 8, 4) { } } -private[sql] object TIMESTAMP extends NativeColumnType(TimestampType, 9, 12) { - override def extract(buffer: ByteBuffer): Timestamp = { - val timestamp = new Timestamp(buffer.getLong()) - timestamp.setNanos(buffer.getInt()) - timestamp +private[sql] object TIMESTAMP extends NativeColumnType(TimestampType, 9, 8) { + override def extract(buffer: ByteBuffer): Long = { + buffer.getLong } - override def append(v: Timestamp, buffer: ByteBuffer): Unit = { - buffer.putLong(v.getTime).putInt(v.getNanos) + override def append(v: Long, buffer: ByteBuffer): Unit = { + buffer.putLong(v) } - override def getField(row: Row, ordinal: Int): Timestamp = { - row(ordinal).asInstanceOf[Timestamp] + override def getField(row: Row, ordinal: Int): Long = { + row(ordinal).asInstanceOf[Long] } - override def setField(row: MutableRow, ordinal: Int, value: Timestamp): Unit = { + override def setField(row: MutableRow, ordinal: Int, value: Long): Unit = { row(ordinal) = value } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala index 256d527d7b636528ba0208ba0effa8a513bfa418..60f3b2d539ffec3f2d0c842cfa07a6c8b366e394 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala @@ -20,14 +20,13 @@ package org.apache.spark.sql.execution import java.io._ import java.math.{BigDecimal, BigInteger} import java.nio.ByteBuffer -import java.sql.Timestamp import scala.reflect.ClassTag -import org.apache.spark.serializer._ import org.apache.spark.Logging +import org.apache.spark.serializer._ import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.expressions.{SpecificMutableRow, MutableRow, GenericMutableRow} +import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, MutableRow, SpecificMutableRow} import org.apache.spark.sql.types._ /** @@ -304,11 +303,7 @@ private[sql] object SparkSqlSerializer2 { out.writeByte(NULL) } else { out.writeByte(NOT_NULL) - val timestamp = row.getAs[java.sql.Timestamp](i) - val time = timestamp.getTime - val nanos = timestamp.getNanos - out.writeLong(time - (nanos / 1000000)) // Write the milliseconds value. - out.writeInt(nanos) // Write the nanoseconds part. + out.writeLong(row.getAs[Long](i)) } case StringType => @@ -429,11 +424,7 @@ private[sql] object SparkSqlSerializer2 { if (in.readByte() == NULL) { mutableRow.setNullAt(i) } else { - val time = in.readLong() // Read the milliseconds value. - val nanos = in.readInt() // Read the nanoseconds part. - val timestamp = new Timestamp(time) - timestamp.setNanos(nanos) - mutableRow.update(i, timestamp) + mutableRow.update(i, in.readLong()) } case StringType => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index dffb265601bdb1a0e130a226eb16331fd98cbdf9..720b529d5946f3f9d0369c95324acde77bbb21be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -170,6 +170,8 @@ package object debug { case (_: Short, ShortType) => case (_: Boolean, BooleanType) => case (_: Double, DoubleType) => + case (_: Int, DateType) => + case (_: Long, TimestampType) => case (v, udt: UserDefinedType[_]) => typeCheck(v, udt.sqlType) case (d, t) => sys.error(s"Invalid data found: got $d (${d.getClass}) expected $t") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala index 342587904789a2dfcc2c6440b8250c3afcadfc84..955b478a4882f80bac837759ca5aaf63719a724c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala @@ -148,6 +148,7 @@ object EvaluatePython { case (ud, udt: UserDefinedType[_]) => toJava(udt.serialize(ud), udt.sqlType) case (date: Int, DateType) => DateUtils.toJavaDate(date) + case (t: Long, TimestampType) => DateUtils.toJavaTimestamp(t) case (s: UTF8String, StringType) => s.toString // Pyrolite can handle Timestamp and Decimal @@ -186,10 +187,12 @@ object EvaluatePython { }): Row case (c: java.util.Calendar, DateType) => - DateUtils.fromJavaDate(new java.sql.Date(c.getTime().getTime())) + DateUtils.fromJavaDate(new java.sql.Date(c.getTimeInMillis)) case (c: java.util.Calendar, TimestampType) => - new java.sql.Timestamp(c.getTime().getTime()) + c.getTimeInMillis * 10000L + case (t: java.sql.Timestamp, TimestampType) => + DateUtils.fromJavaTimestamp(t) case (_, udt: UserDefinedType[_]) => fromJava(obj, udt.sqlType) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala index db68b9c86db1b21f6777c429508e6e5730a6016e..9028d5ed72c9223de2f0b828dd585e624997c66b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala @@ -385,7 +385,7 @@ private[sql] class JDBCRDD( // DateUtils.fromJavaDate does not handle null value, so we need to check it. val dateVal = rs.getDate(pos) if (dateVal != null) { - mutableRow.update(i, DateUtils.fromJavaDate(dateVal)) + mutableRow.setInt(i, DateUtils.fromJavaDate(dateVal)) } else { mutableRow.update(i, null) } @@ -417,7 +417,13 @@ private[sql] class JDBCRDD( case LongConversion => mutableRow.setLong(i, rs.getLong(pos)) // TODO(davies): use getBytes for better performance, if the encoding is UTF-8 case StringConversion => mutableRow.setString(i, rs.getString(pos)) - case TimestampConversion => mutableRow.update(i, rs.getTimestamp(pos)) + case TimestampConversion => + val t = rs.getTimestamp(pos) + if (t != null) { + mutableRow.setLong(i, DateUtils.fromJavaTimestamp(t)) + } else { + mutableRow.update(i, null) + } case BinaryConversion => mutableRow.update(i, rs.getBytes(pos)) case BinaryLongConversion => { val bytes = rs.getBytes(pos) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala index 0e223758051a696882c0721d0b6275402516fd7a..4e07cf36ae43466712a8aad5ef451b8c8e6ed5aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.json import java.io.ByteArrayOutputStream -import java.sql.Timestamp import scala.collection.Map @@ -65,10 +64,10 @@ private[sql] object JacksonParser { DateUtils.millisToDays(DateUtils.stringToTime(parser.getText).getTime) case (VALUE_STRING, TimestampType) => - new Timestamp(DateUtils.stringToTime(parser.getText).getTime) + DateUtils.stringToTime(parser.getText).getTime * 10000L case (VALUE_NUMBER_INT, TimestampType) => - new Timestamp(parser.getLongValue) + parser.getLongValue * 10000L case (_, StringType) => val writer = new ByteArrayOutputStream() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index 7e1e21f5fbb999ab40b3fe4daafac1f0c54c7f4d..fb0d137bdbfdbf78ac100d742424a0ae856b7c8a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.json -import java.sql.Timestamp - import scala.collection.Map import scala.collection.convert.Wrappers.{JListWrapper, JMapWrapper} @@ -398,11 +396,11 @@ private[sql] object JsonRDD extends Logging { } } - private def toTimestamp(value: Any): Timestamp = { + private def toTimestamp(value: Any): Long = { value match { - case value: java.lang.Integer => new Timestamp(value.asInstanceOf[Int].toLong) - case value: java.lang.Long => new Timestamp(value) - case value: java.lang.String => toTimestamp(DateUtils.stringToTime(value).getTime) + case value: java.lang.Integer => value.asInstanceOf[Int].toLong * 10000L + case value: java.lang.Long => value * 10000L + case value: java.lang.String => DateUtils.stringToTime(value).getTime * 10000L } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala index 85c2ce740fe5207d313d2713861114eb36c15fac..ddc5097f88fb134054f4c11a6a62eebf5ba7b3b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala @@ -28,6 +28,7 @@ import org.apache.parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Co import org.apache.parquet.schema.MessageType import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.parquet.CatalystConverter.FieldType import org.apache.spark.sql.types._ import org.apache.spark.sql.parquet.timestamp.NanoTime @@ -266,8 +267,8 @@ private[parquet] abstract class CatalystConverter extends GroupConverter { /** * Read a Timestamp value from a Parquet Int96Value */ - protected[parquet] def readTimestamp(value: Binary): Timestamp = { - CatalystTimestampConverter.convertToTimestamp(value) + protected[parquet] def readTimestamp(value: Binary): Long = { + DateUtils.fromJavaTimestamp(CatalystTimestampConverter.convertToTimestamp(value)) } } @@ -401,7 +402,7 @@ private[parquet] class CatalystPrimitiveRowConverter( current.setInt(fieldIndex, value) override protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit = - current.update(fieldIndex, value) + current.setInt(fieldIndex, value) override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit = current.setLong(fieldIndex, value) @@ -425,7 +426,7 @@ private[parquet] class CatalystPrimitiveRowConverter( current.update(fieldIndex, UTF8String(value)) override protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit = - current.update(fieldIndex, readTimestamp(value)) + current.setLong(fieldIndex, readTimestamp(value)) override protected[parquet] def updateDecimal( fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index 89db408b1c3826dbfa2ee39844be6a9dafc355dc..e03dbdec0491d0ce5fccc005e526a22ac836f7e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -29,6 +29,7 @@ import org.apache.parquet.schema.MessageType import org.apache.spark.Logging import org.apache.spark.sql.catalyst.expressions.{Attribute, Row} +import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.types._ /** @@ -204,7 +205,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { case IntegerType => writer.addInteger(value.asInstanceOf[Int]) case ShortType => writer.addInteger(value.asInstanceOf[Short]) case LongType => writer.addLong(value.asInstanceOf[Long]) - case TimestampType => writeTimestamp(value.asInstanceOf[java.sql.Timestamp]) + case TimestampType => writeTimestamp(value.asInstanceOf[Long]) case ByteType => writer.addInteger(value.asInstanceOf[Byte]) case DoubleType => writer.addDouble(value.asInstanceOf[Double]) case FloatType => writer.addFloat(value.asInstanceOf[Float]) @@ -311,8 +312,9 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { writer.addBinary(Binary.fromByteArray(scratchBytes, 0, numBytes)) } - private[parquet] def writeTimestamp(ts: java.sql.Timestamp): Unit = { - val binaryNanoTime = CatalystTimestampConverter.convertFromTimestamp(ts) + private[parquet] def writeTimestamp(ts: Long): Unit = { + val binaryNanoTime = CatalystTimestampConverter.convertFromTimestamp( + DateUtils.toJavaTimestamp(ts)) writer.addBinary(binaryNanoTime) } } @@ -357,7 +359,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport { case FloatType => writer.addFloat(record.getFloat(index)) case BooleanType => writer.addBoolean(record.getBoolean(index)) case DateType => writer.addInteger(record.getInt(index)) - case TimestampType => writeTimestamp(record(index).asInstanceOf[java.sql.Timestamp]) + case TimestampType => writeTimestamp(record(index).asInstanceOf[Long]) case d: DecimalType => if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) { sys.error(s"Unsupported datatype $d, cannot write to consumer") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 72e60d9aa75cbf4c2b5e2858f296bfa97c981f62..17a3cec48b856802301fad09d9e5979a23b9b4de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -25,7 +25,7 @@ import org.scalatest.concurrent.Eventually._ import org.apache.spark.Accumulators import org.apache.spark.sql.TestData._ import org.apache.spark.sql.columnar._ -import org.apache.spark.storage.{RDDBlockId, StorageLevel} +import org.apache.spark.storage.{StorageLevel, RDDBlockId} case class BigData(s: String) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala index 339e719f39f160daae46c4d3624a88ea03d05f54..16836628cb73a6ddce224212d969c370efbcfbbd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala @@ -31,7 +31,7 @@ class ColumnStatsSuite extends SparkFunSuite { testColumnStats(classOf[FixedDecimalColumnStats], FIXED_DECIMAL(15, 10), Row(null, null, 0)) testColumnStats(classOf[StringColumnStats], STRING, Row(null, null, 0)) testColumnStats(classOf[DateColumnStats], DATE, Row(Int.MaxValue, Int.MinValue, 0)) - testColumnStats(classOf[TimestampColumnStats], TIMESTAMP, Row(null, null, 0)) + testColumnStats(classOf[TimestampColumnStats], TIMESTAMP, Row(Long.MaxValue, Long.MinValue, 0)) def testColumnStats[T <: AtomicType, U <: ColumnStats]( columnStatsClass: Class[U], diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala index a1e76eaa982cc4561258b853df49bb8b8613b40c..8421e670ff05da0c98b86bc88b591f0e56346578 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala @@ -18,17 +18,16 @@ package org.apache.spark.sql.columnar import java.nio.ByteBuffer -import java.sql.Timestamp -import com.esotericsoftware.kryo.{Serializer, Kryo} import com.esotericsoftware.kryo.io.{Input, Output} -import org.apache.spark.serializer.KryoRegistrator +import com.esotericsoftware.kryo.{Kryo, Serializer} -import org.apache.spark.{Logging, SparkConf, SparkFunSuite} +import org.apache.spark.serializer.KryoRegistrator import org.apache.spark.sql.catalyst.expressions.GenericMutableRow import org.apache.spark.sql.columnar.ColumnarTestUtils._ import org.apache.spark.sql.execution.SparkSqlSerializer import org.apache.spark.sql.types._ +import org.apache.spark.{Logging, SparkConf, SparkFunSuite} class ColumnTypeSuite extends SparkFunSuite with Logging { val DEFAULT_BUFFER_SIZE = 512 @@ -36,7 +35,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging { test("defaultSize") { val checks = Map( INT -> 4, SHORT -> 2, LONG -> 8, BYTE -> 1, DOUBLE -> 8, FLOAT -> 4, - FIXED_DECIMAL(15, 10) -> 8, BOOLEAN -> 1, STRING -> 8, DATE -> 4, TIMESTAMP -> 12, + FIXED_DECIMAL(15, 10) -> 8, BOOLEAN -> 1, STRING -> 8, DATE -> 4, TIMESTAMP -> 8, BINARY -> 16, GENERIC -> 16) checks.foreach { case (columnType, expectedSize) => @@ -69,7 +68,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging { checkActualSize(BOOLEAN, true, 1) checkActualSize(STRING, UTF8String("hello"), 4 + "hello".getBytes("utf-8").length) checkActualSize(DATE, 0, 4) - checkActualSize(TIMESTAMP, new Timestamp(0L), 12) + checkActualSize(TIMESTAMP, 0L, 8) val binary = Array.fill[Byte](4)(0: Byte) checkActualSize(BINARY, binary, 4 + 4) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala index 75d993e563e06bfd8d9aab313734c2b558e04a47..c5d38595c0bec65a18f8bfb92cc165583e8b094d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala @@ -17,14 +17,12 @@ package org.apache.spark.sql.columnar -import java.sql.Timestamp - import scala.collection.immutable.HashSet import scala.util.Random import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.GenericMutableRow -import org.apache.spark.sql.types.{UTF8String, DataType, Decimal, AtomicType} +import org.apache.spark.sql.types.{AtomicType, DataType, Decimal, UTF8String} object ColumnarTestUtils { def makeNullRow(length: Int): GenericMutableRow = { @@ -52,10 +50,7 @@ object ColumnarTestUtils { case BOOLEAN => Random.nextBoolean() case BINARY => randomBytes(Random.nextInt(32)) case DATE => Random.nextInt() - case TIMESTAMP => - val timestamp = new Timestamp(Random.nextLong()) - timestamp.setNanos(Random.nextInt(999999999)) - timestamp + case TIMESTAMP => Random.nextLong() case _ => // Using a random one-element map instead of an arbitrary object Map(Random.nextInt() -> Random.nextString(Random.nextInt(32))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 49d348c3ed21b66aaad7b0abe693fe0bb48b280a..69ab1c292d221f9a6da3ab23400c5c14b0a49b47 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -326,7 +326,7 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter { assert(cal.get(Calendar.HOUR) === 11) assert(cal.get(Calendar.MINUTE) === 22) assert(cal.get(Calendar.SECOND) === 33) - assert(rows(0).getAs[java.sql.Timestamp](2).getNanos === 543543543) + assert(rows(0).getAs[java.sql.Timestamp](2).getNanos === 543543500) } test("test DATE types") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala index d889c7be17ce731b2294b2de3b62eda0cd08169c..fca24364fe6ec8a53350d27a1256204e05b91386 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala @@ -76,21 +76,25 @@ class JsonSuite extends QueryTest with TestJsonData { checkTypePromotion( Decimal(doubleNumber), enforceCorrectType(doubleNumber, DecimalType.Unlimited)) - checkTypePromotion(new Timestamp(intNumber), enforceCorrectType(intNumber, TimestampType)) - checkTypePromotion(new Timestamp(intNumber.toLong), + checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(intNumber)), + enforceCorrectType(intNumber, TimestampType)) + checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong)), enforceCorrectType(intNumber.toLong, TimestampType)) val strTime = "2014-09-30 12:34:56" - checkTypePromotion(Timestamp.valueOf(strTime), enforceCorrectType(strTime, TimestampType)) + checkTypePromotion(DateUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)), + enforceCorrectType(strTime, TimestampType)) val strDate = "2014-10-15" checkTypePromotion( DateUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType)) val ISO8601Time1 = "1970-01-01T01:00:01.0Z" - checkTypePromotion(new Timestamp(3601000), enforceCorrectType(ISO8601Time1, TimestampType)) + checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(3601000)), + enforceCorrectType(ISO8601Time1, TimestampType)) checkTypePromotion(DateUtils.millisToDays(3601000), enforceCorrectType(ISO8601Time1, DateType)) val ISO8601Time2 = "1970-01-01T02:00:01-01:00" - checkTypePromotion(new Timestamp(10801000), enforceCorrectType(ISO8601Time2, TimestampType)) + checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(10801000)), + enforceCorrectType(ISO8601Time2, TimestampType)) checkTypePromotion(DateUtils.millisToDays(10801000), enforceCorrectType(ISO8601Time2, DateType)) } diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 0693c7ea5b3329cba0ca14b5826f7d3afc1f10b6..82c0b494598a8077cce10791eca52bf99bd80d41 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -252,7 +252,11 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "load_dyn_part14.*", // These work alone but fail when run with other tests... // the answer is sensitive for jdk version - "udf_java_method" + "udf_java_method", + + // Spark SQL use Long for TimestampType, lose the precision under 100ns + "timestamp_1", + "timestamp_2" ) /** @@ -795,8 +799,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "stats_publisher_error_1", "subq2", "tablename_with_select", - "timestamp_1", - "timestamp_2", "timestamp_3", "timestamp_comparison", "timestamp_lazy", diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index c466203cd02206580b8315392fc33b52f363133a..1f14cba78f4798016c86421a9928de1e6f6f9815 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -250,7 +250,8 @@ private[hive] trait HiveInspectors { PrimitiveObjectInspectorFactory.javaHiveDecimalObjectInspector, poi.getWritableConstantValue.getHiveDecimal) case poi: WritableConstantTimestampObjectInspector => - poi.getWritableConstantValue.getTimestamp.clone() + val t = poi.getWritableConstantValue + t.getSeconds * 10000000L + t.getNanos / 100L case poi: WritableConstantIntObjectInspector => poi.getWritableConstantValue.get() case poi: WritableConstantDoubleObjectInspector => @@ -313,11 +314,11 @@ private[hive] trait HiveInspectors { case x: DateObjectInspector if x.preferWritable() => DateUtils.fromJavaDate(x.getPrimitiveWritableObject(data).get()) case x: DateObjectInspector => DateUtils.fromJavaDate(x.getPrimitiveJavaObject(data)) - // org.apache.hadoop.hive.serde2.io.TimestampWritable.set will reset current time object - // if next timestamp is null, so Timestamp object is cloned case x: TimestampObjectInspector if x.preferWritable() => - x.getPrimitiveWritableObject(data).getTimestamp.clone() - case ti: TimestampObjectInspector => ti.getPrimitiveJavaObject(data).clone() + val t = x.getPrimitiveWritableObject(data) + t.getSeconds * 10000000L + t.getNanos / 100 + case ti: TimestampObjectInspector => + DateUtils.fromJavaTimestamp(ti.getPrimitiveJavaObject(data)) case _ => pi.getPrimitiveJavaObject(data) } case li: ListObjectInspector => @@ -356,6 +357,9 @@ private[hive] trait HiveInspectors { case _: JavaDateObjectInspector => (o: Any) => DateUtils.toJavaDate(o.asInstanceOf[Int]) + case _: JavaTimestampObjectInspector => + (o: Any) => DateUtils.toJavaTimestamp(o.asInstanceOf[Long]) + case soi: StandardStructObjectInspector => val wrappers = soi.getAllStructFieldRefs.map(ref => wrapperFor(ref.getFieldObjectInspector)) (o: Any) => { @@ -465,7 +469,7 @@ private[hive] trait HiveInspectors { case _: DateObjectInspector if x.preferWritable() => getDateWritable(a) case _: DateObjectInspector => DateUtils.toJavaDate(a.asInstanceOf[Int]) case _: TimestampObjectInspector if x.preferWritable() => getTimestampWritable(a) - case _: TimestampObjectInspector => a.asInstanceOf[java.sql.Timestamp] + case _: TimestampObjectInspector => DateUtils.toJavaTimestamp(a.asInstanceOf[Long]) } case x: SettableStructObjectInspector => val fieldRefs = x.getAllStructFieldRefs @@ -727,7 +731,7 @@ private[hive] trait HiveInspectors { TypeInfoFactory.voidTypeInfo, null) private def getStringWritable(value: Any): hadoopIo.Text = - if (value == null) null else new hadoopIo.Text(value.asInstanceOf[UTF8String].toString) + if (value == null) null else new hadoopIo.Text(value.asInstanceOf[UTF8String].getBytes) private def getIntWritable(value: Any): hadoopIo.IntWritable = if (value == null) null else new hadoopIo.IntWritable(value.asInstanceOf[Int]) @@ -776,7 +780,7 @@ private[hive] trait HiveInspectors { if (value == null) { null } else { - new hiveIo.TimestampWritable(value.asInstanceOf[java.sql.Timestamp]) + new hiveIo.TimestampWritable(DateUtils.toJavaTimestamp(value.asInstanceOf[Long])) } private def getDecimalWritable(value: Any): hiveIo.HiveDecimalWritable = diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 334bfccc9d20047982eae1639e8e12222f00fedf..d3c82d8c2e32698ac3e3792177c857cde61a4801 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -363,10 +363,10 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging { row.update(ordinal, HiveShim.toCatalystDecimal(oi, value)) case oi: TimestampObjectInspector => (value: Any, row: MutableRow, ordinal: Int) => - row.update(ordinal, oi.getPrimitiveJavaObject(value).clone()) + row.setLong(ordinal, DateUtils.fromJavaTimestamp(oi.getPrimitiveJavaObject(value))) case oi: DateObjectInspector => (value: Any, row: MutableRow, ordinal: Int) => - row.update(ordinal, DateUtils.fromJavaDate(oi.getPrimitiveJavaObject(value))) + row.setInt(ordinal, DateUtils.fromJavaDate(oi.getPrimitiveJavaObject(value))) case oi: BinaryObjectInspector => (value: Any, row: MutableRow, ordinal: Int) => row.update(ordinal, oi.getPrimitiveJavaObject(value)) diff --git a/sql/hive/src/test/resources/golden/timestamp cast #5-0-dbd7bcd167d322d6617b884c02c7f247 b/sql/hive/src/test/resources/golden/timestamp cast #5-0-dbd7bcd167d322d6617b884c02c7f247 index 27de46fdf22ac29348b7512de44a8ea6035cbbd0..84a31a5a6970bc899dcde67c0ef418be43ce4fd3 100644 --- a/sql/hive/src/test/resources/golden/timestamp cast #5-0-dbd7bcd167d322d6617b884c02c7f247 +++ b/sql/hive/src/test/resources/golden/timestamp cast #5-0-dbd7bcd167d322d6617b884c02c7f247 @@ -1 +1 @@ --0.0010000000000000009 +-0.001