Skip to content
Snippets Groups Projects
Commit 94566885 authored by Tejas Patil's avatar Tejas Patil Committed by Xiao Li
Browse files

[SPARK-17495][SQL] Support date, timestamp and interval types in Hive hash

## What changes were proposed in this pull request?

- Timestamp hashing is done as per [TimestampWritable.hashCode()](https://github.com/apache/hive/blob/ff67cdda1c538dc65087878eeba3e165cf3230f4/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java#L406) in Hive
- Interval hashing is done as per [HiveIntervalDayTime.hashCode()](https://github.com/apache/hive/blob/ff67cdda1c538dc65087878eeba3e165cf3230f4/storage-api/src/java/org/apache/hadoop/hive/common/type/HiveIntervalDayTime.java#L178). Note that there are inherent differences in how Hive and Spark store intervals under the hood which limits the ability to be in completely sync with hive's hashing function. I have explained this in the method doc.
- Date type was already supported. This PR adds test for that.

## How was this patch tested?

Added unit tests

Author: Tejas Patil <tejasp@fb.com>

Closes #17062 from tejasapatil/SPARK-17495_time_related_types.
parent 0a4d06a7
No related branches found
No related tags found
No related merge requests found
......@@ -335,6 +335,8 @@ abstract class HashExpression[E] extends Expression {
}
}
protected def genHashTimestamp(t: String, result: String): String = genHashLong(t, result)
protected def genHashCalendarInterval(input: String, result: String): String = {
val microsecondsHash = s"$hasherClassName.hashLong($input.microseconds, $result)"
s"$result = $hasherClassName.hashInt($input.months, $microsecondsHash);"
......@@ -400,7 +402,8 @@ abstract class HashExpression[E] extends Expression {
case NullType => ""
case BooleanType => genHashBoolean(input, result)
case ByteType | ShortType | IntegerType | DateType => genHashInt(input, result)
case LongType | TimestampType => genHashLong(input, result)
case LongType => genHashLong(input, result)
case TimestampType => genHashTimestamp(input, result)
case FloatType => genHashFloat(input, result)
case DoubleType => genHashDouble(input, result)
case d: DecimalType => genHashDecimal(ctx, d, input, result)
......@@ -433,6 +436,10 @@ abstract class InterpretedHashFunction {
protected def hashUnsafeBytes(base: AnyRef, offset: Long, length: Int, seed: Long): Long
/**
* Computes hash of a given `value` of type `dataType`. The caller needs to check the validity
* of input `value`.
*/
def hash(value: Any, dataType: DataType, seed: Long): Long = {
value match {
case null => seed
......@@ -580,8 +587,6 @@ object XxHash64Function extends InterpretedHashFunction {
*
* We should use this hash function for both shuffle and bucket of Hive tables, so that
* we can guarantee shuffle and bucketing have same data distribution
*
* TODO: Support date related types
*/
@ExpressionDescription(
usage = "_FUNC_(expr1, expr2, ...) - Returns a hash value of the arguments.")
......@@ -648,11 +653,16 @@ case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] {
override protected def genHashCalendarInterval(input: String, result: String): String = {
s"""
$result = (31 * $hasherClassName.hashInt($input.months)) +
$hasherClassName.hashLong($input.microseconds);"
$result = (int)
${HiveHashFunction.getClass.getName.stripSuffix("$")}.hashCalendarInterval($input);
"""
}
override protected def genHashTimestamp(input: String, result: String): String =
s"""
$result = (int) ${HiveHashFunction.getClass.getName.stripSuffix("$")}.hashTimestamp($input);
"""
override protected def genHashString(input: String, result: String): String = {
val baseObject = s"$input.getBaseObject()"
val baseOffset = s"$input.getBaseOffset()"
......@@ -781,6 +791,49 @@ object HiveHashFunction extends InterpretedHashFunction {
result
}
/**
* Mimics TimestampWritable.hashCode() in Hive
*/
def hashTimestamp(timestamp: Long): Long = {
val timestampInSeconds = timestamp / 1000000
val nanoSecondsPortion = (timestamp % 1000000) * 1000
var result = timestampInSeconds
result <<= 30 // the nanosecond part fits in 30 bits
result |= nanoSecondsPortion
((result >>> 32) ^ result).toInt
}
/**
* Hive allows input intervals to be defined using units below but the intervals
* have to be from the same category:
* - year, month (stored as HiveIntervalYearMonth)
* - day, hour, minute, second, nanosecond (stored as HiveIntervalDayTime)
*
* eg. (INTERVAL '30' YEAR + INTERVAL '-23' DAY) fails in Hive
*
* This method mimics HiveIntervalDayTime.hashCode() in Hive.
*
* Two differences wrt Hive due to how intervals are stored in Spark vs Hive:
*
* - If the `INTERVAL` is backed as HiveIntervalYearMonth in Hive, then this method will not
* produce Hive compatible result. The reason being Spark's representation of calendar does not
* have such categories based on the interval and is unified.
*
* - Spark's [[CalendarInterval]] has precision upto microseconds but Hive's
* HiveIntervalDayTime can store data with precision upto nanoseconds. So, any input intervals
* with nanosecond values will lead to wrong output hashes (ie. non adherent with Hive output)
*/
def hashCalendarInterval(calendarInterval: CalendarInterval): Long = {
val totalSeconds = calendarInterval.microseconds / CalendarInterval.MICROS_PER_SECOND.toInt
val result: Int = (17 * 37) + (totalSeconds ^ totalSeconds >> 32).toInt
val nanoSeconds =
(calendarInterval.microseconds -
(totalSeconds * CalendarInterval.MICROS_PER_SECOND.toInt)).toInt * 1000
(result * 37) + nanoSeconds
}
override def hash(value: Any, dataType: DataType, seed: Long): Long = {
value match {
case null => 0
......@@ -834,10 +887,10 @@ object HiveHashFunction extends InterpretedHashFunction {
}
result
case d: Decimal =>
normalizeDecimal(d.toJavaBigDecimal).hashCode()
case _ => super.hash(value, dataType, seed)
case d: Decimal => normalizeDecimal(d.toJavaBigDecimal).hashCode()
case timestamp: Long if dataType.isInstanceOf[TimestampType] => hashTimestamp(timestamp)
case calendarInterval: CalendarInterval => hashCalendarInterval(calendarInterval)
case _ => super.hash(value, dataType, 0)
}
}
}
......@@ -18,18 +18,20 @@
package org.apache.spark.sql.catalyst.expressions
import java.nio.charset.StandardCharsets
import java.util.TimeZone
import scala.collection.mutable.ArrayBuffer
import org.apache.commons.codec.digest.DigestUtils
import org.scalatest.exceptions.TestFailedException
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{RandomDataGenerator, Row}
import org.apache.spark.sql.catalyst.encoders.{ExamplePointUDT, RowEncoder}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.types.{ArrayType, StructType, _}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val random = new scala.util.Random
......@@ -168,6 +170,208 @@ class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
// scalastyle:on nonascii
}
test("hive-hash for date type") {
def checkHiveHashForDateType(dateString: String, expected: Long): Unit = {
checkHiveHash(
DateTimeUtils.stringToDate(UTF8String.fromString(dateString)).get,
DateType,
expected)
}
// basic case
checkHiveHashForDateType("2017-01-01", 17167)
// boundary cases
checkHiveHashForDateType("0000-01-01", -719530)
checkHiveHashForDateType("9999-12-31", 2932896)
// epoch
checkHiveHashForDateType("1970-01-01", 0)
// before epoch
checkHiveHashForDateType("1800-01-01", -62091)
// Invalid input: bad date string. Hive returns 0 for such cases
intercept[NoSuchElementException](checkHiveHashForDateType("0-0-0", 0))
intercept[NoSuchElementException](checkHiveHashForDateType("-1212-01-01", 0))
intercept[NoSuchElementException](checkHiveHashForDateType("2016-99-99", 0))
// Invalid input: Empty string. Hive returns 0 for this case
intercept[NoSuchElementException](checkHiveHashForDateType("", 0))
// Invalid input: February 30th for a leap year. Hive supports this but Spark doesn't
intercept[NoSuchElementException](checkHiveHashForDateType("2016-02-30", 16861))
}
test("hive-hash for timestamp type") {
def checkHiveHashForTimestampType(
timestamp: String,
expected: Long,
timeZone: TimeZone = TimeZone.getTimeZone("UTC")): Unit = {
checkHiveHash(
DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp), timeZone).get,
TimestampType,
expected)
}
// basic case
checkHiveHashForTimestampType("2017-02-24 10:56:29", 1445725271)
// with higher precision
checkHiveHashForTimestampType("2017-02-24 10:56:29.111111", 1353936655)
// with different timezone
checkHiveHashForTimestampType("2017-02-24 10:56:29", 1445732471,
TimeZone.getTimeZone("US/Pacific"))
// boundary cases
checkHiveHashForTimestampType("0001-01-01 00:00:00", 1645926784)
checkHiveHashForTimestampType("9999-01-01 00:00:00", -1081818240)
// epoch
checkHiveHashForTimestampType("1970-01-01 00:00:00", 0)
// before epoch
checkHiveHashForTimestampType("1800-01-01 03:12:45", -267420885)
// Invalid input: bad timestamp string. Hive returns 0 for such cases
intercept[NoSuchElementException](checkHiveHashForTimestampType("0-0-0 0:0:0", 0))
intercept[NoSuchElementException](checkHiveHashForTimestampType("-99-99-99 99:99:45", 0))
intercept[NoSuchElementException](checkHiveHashForTimestampType("555555-55555-5555", 0))
// Invalid input: Empty string. Hive returns 0 for this case
intercept[NoSuchElementException](checkHiveHashForTimestampType("", 0))
// Invalid input: February 30th is a leap year. Hive supports this but Spark doesn't
intercept[NoSuchElementException](checkHiveHashForTimestampType("2016-02-30 00:00:00", 0))
// Invalid input: Hive accepts upto 9 decimal place precision but Spark uses upto 6
intercept[TestFailedException](checkHiveHashForTimestampType("2017-02-24 10:56:29.11111111", 0))
}
test("hive-hash for CalendarInterval type") {
def checkHiveHashForIntervalType(interval: String, expected: Long): Unit = {
checkHiveHash(CalendarInterval.fromString(interval), CalendarIntervalType, expected)
}
// ----- MICROSEC -----
// basic case
checkHiveHashForIntervalType("interval 1 microsecond", 24273)
// negative
checkHiveHashForIntervalType("interval -1 microsecond", 22273)
// edge / boundary cases
checkHiveHashForIntervalType("interval 0 microsecond", 23273)
checkHiveHashForIntervalType("interval 999 microsecond", 1022273)
checkHiveHashForIntervalType("interval -999 microsecond", -975727)
// ----- MILLISEC -----
// basic case
checkHiveHashForIntervalType("interval 1 millisecond", 1023273)
// negative
checkHiveHashForIntervalType("interval -1 millisecond", -976727)
// edge / boundary cases
checkHiveHashForIntervalType("interval 0 millisecond", 23273)
checkHiveHashForIntervalType("interval 999 millisecond", 999023273)
checkHiveHashForIntervalType("interval -999 millisecond", -998976727)
// ----- SECOND -----
// basic case
checkHiveHashForIntervalType("interval 1 second", 23310)
// negative
checkHiveHashForIntervalType("interval -1 second", 23273)
// edge / boundary cases
checkHiveHashForIntervalType("interval 0 second", 23273)
checkHiveHashForIntervalType("interval 2147483647 second", -2147460412)
checkHiveHashForIntervalType("interval -2147483648 second", -2147460412)
// Out of range for both Hive and Spark
// Hive throws an exception. Spark overflows and returns wrong output
// checkHiveHashForIntervalType("interval 9999999999 second", 0)
// ----- MINUTE -----
// basic cases
checkHiveHashForIntervalType("interval 1 minute", 25493)
// negative
checkHiveHashForIntervalType("interval -1 minute", 25456)
// edge / boundary cases
checkHiveHashForIntervalType("interval 0 minute", 23273)
checkHiveHashForIntervalType("interval 2147483647 minute", 21830)
checkHiveHashForIntervalType("interval -2147483648 minute", 22163)
// Out of range for both Hive and Spark
// Hive throws an exception. Spark overflows and returns wrong output
// checkHiveHashForIntervalType("interval 9999999999 minute", 0)
// ----- HOUR -----
// basic case
checkHiveHashForIntervalType("interval 1 hour", 156473)
// negative
checkHiveHashForIntervalType("interval -1 hour", 156436)
// edge / boundary cases
checkHiveHashForIntervalType("interval 0 hour", 23273)
checkHiveHashForIntervalType("interval 2147483647 hour", -62308)
checkHiveHashForIntervalType("interval -2147483648 hour", -43327)
// Out of range for both Hive and Spark
// Hive throws an exception. Spark overflows and returns wrong output
// checkHiveHashForIntervalType("interval 9999999999 hour", 0)
// ----- DAY -----
// basic cases
checkHiveHashForIntervalType("interval 1 day", 3220073)
// negative
checkHiveHashForIntervalType("interval -1 day", 3220036)
// edge / boundary cases
checkHiveHashForIntervalType("interval 0 day", 23273)
checkHiveHashForIntervalType("interval 106751991 day", -451506760)
checkHiveHashForIntervalType("interval -106751991 day", -451514123)
// Hive supports `day` for a longer range but Spark's range is smaller
// The check for range is done at the parser level so this does not fail in Spark
// checkHiveHashForIntervalType("interval -2147483648 day", -1575127)
// checkHiveHashForIntervalType("interval 2147483647 day", -4767228)
// Out of range for both Hive and Spark
// Hive throws an exception. Spark overflows and returns wrong output
// checkHiveHashForIntervalType("interval 9999999999 day", 0)
// ----- MIX -----
checkHiveHashForIntervalType("interval 0 day 0 hour", 23273)
checkHiveHashForIntervalType("interval 0 day 0 hour 0 minute", 23273)
checkHiveHashForIntervalType("interval 0 day 0 hour 0 minute 0 second", 23273)
checkHiveHashForIntervalType("interval 0 day 0 hour 0 minute 0 second 0 millisecond", 23273)
checkHiveHashForIntervalType(
"interval 0 day 0 hour 0 minute 0 second 0 millisecond 0 microsecond", 23273)
checkHiveHashForIntervalType("interval 6 day 15 hour", 21202073)
checkHiveHashForIntervalType("interval 5 day 4 hour 8 minute", 16557833)
checkHiveHashForIntervalType("interval -23 day 56 hour -1111113 minute 9898989 second",
-2128468593)
checkHiveHashForIntervalType("interval 66 day 12 hour 39 minute 23 second 987 millisecond",
1199697904)
checkHiveHashForIntervalType(
"interval 66 day 12 hour 39 minute 23 second 987 millisecond 123 microsecond", 1199820904)
}
test("hive-hash for array") {
// empty array
checkHiveHash(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment