Skip to content
Snippets Groups Projects
Commit 2ca8ae9a authored by Cheng Lian's avatar Cheng Lian Committed by Yin Huai
Browse files

[SPARK-18186] Migrate HiveUDAFFunction to TypedImperativeAggregate for partial aggregation support

## What changes were proposed in this pull request?

While being evaluated in Spark SQL, Hive UDAFs don't support partial aggregation. This PR migrates `HiveUDAFFunction`s to `TypedImperativeAggregate`, which already provides partial aggregation support for aggregate functions that may use arbitrary Java objects as aggregation states.

The following snippet shows the effect of this PR:

```scala
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax
sql(s"CREATE FUNCTION hive_max AS '${classOf[GenericUDAFMax].getName}'")

spark.range(100).createOrReplaceTempView("t")

// A query using both Spark SQL native `max` and Hive `max`
sql(s"SELECT max(id), hive_max(id) FROM t").explain()
```

Before this PR:

```
== Physical Plan ==
SortAggregate(key=[], functions=[max(id#1L), default.hive_max(default.hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax7475f57e), id#1L, false, 0, 0)])
+- Exchange SinglePartition
   +- *Range (0, 100, step=1, splits=Some(1))
```

After this PR:

```
== Physical Plan ==
SortAggregate(key=[], functions=[max(id#1L), default.hive_max(default.hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax5e18a6a7), id#1L, false, 0, 0)])
+- Exchange SinglePartition
   +- SortAggregate(key=[], functions=[partial_max(id#1L), partial_default.hive_max(default.hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax5e18a6a7), id#1L, false, 0, 0)])
      +- *Range (0, 100, step=1, splits=Some(1))
```

The tricky part of the PR is mostly about updating and passing around aggregation states of `HiveUDAFFunction`s since the aggregation state of a Hive UDAF may appear in three different forms. Let's take a look at the testing `MockUDAF` added in this PR as an example. This UDAF computes the count of non-null values together with the count of nulls of a given column. Its aggregation state may appear as the following forms at different time:

1. A `MockUDAFBuffer`, which is a concrete subclass of `GenericUDAFEvaluator.AggregationBuffer`

   The form used by Hive UDAF API. This form is required by the following scenarios:

   - Calling `GenericUDAFEvaluator.iterate()` to update an existing aggregation state with new input values.
   - Calling `GenericUDAFEvaluator.terminate()` to get the final aggregated value from an existing aggregation state.
   - Calling `GenericUDAFEvaluator.merge()` to merge other aggregation states into an existing aggregation state.

     The existing aggregation state to be updated must be in this form.

   Conversions:

   - To form 2:

     `GenericUDAFEvaluator.terminatePartial()`

   - To form 3:

     Convert to form 2 first, and then to 3.

2. An `Object[]` array containing two `java.lang.Long` values.

   The form used to interact with Hive's `ObjectInspector`s. This form is required by the following scenarios:

   - Calling `GenericUDAFEvaluator.terminatePartial()` to convert an existing aggregation state in form 1 to form 2.
   - Calling `GenericUDAFEvaluator.merge()` to merge other aggregation states into an existing aggregation state.

     The input aggregation state must be in this form.

   Conversions:

   - To form 1:

     No direct method. Have to create an empty `AggregationBuffer` and merge it into the empty buffer.

   - To form 3:

     `unwrapperFor()`/`unwrap()` method of `HiveInspectors`

3. The byte array that holds data of an `UnsafeRow` with two `LongType` fields.

   The form used by Spark SQL to shuffle partial aggregation results. This form is required because `TypedImperativeAggregate` always asks its subclasses to serialize their aggregation states into a byte array.

   Conversions:

   - To form 1:

     Convert to form 2 first, and then to 1.

   - To form 2:

     `wrapperFor()`/`wrap()` method of `HiveInspectors`

Here're some micro-benchmark results produced by the most recent master and this PR branch.

Master:

```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4960HQ CPU  2.60GHz

hive udaf vs spark af:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
w/o groupBy                                    339 /  372          3.1         323.2       1.0X
w/ groupBy                                     503 /  529          2.1         479.7       0.7X
```

This PR:

```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4960HQ CPU  2.60GHz

hive udaf vs spark af:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
w/o groupBy                                    116 /  126          9.0         110.8       1.0X
w/ groupBy                                     151 /  159          6.9         144.0       0.8X
```

Benchmark code snippet:

```scala
  test("Hive UDAF benchmark") {
    val N = 1 << 20

    sparkSession.sql(s"CREATE TEMPORARY FUNCTION hive_max AS '${classOf[GenericUDAFMax].getName}'")

    val benchmark = new Benchmark(
      name = "hive udaf vs spark af",
      valuesPerIteration = N,
      minNumIters = 5,
      warmupTime = 5.seconds,
      minTime = 5.seconds,
      outputPerIteration = true
    )

    benchmark.addCase("w/o groupBy") { _ =>
      sparkSession.range(N).agg("id" -> "hive_max").collect()
    }

    benchmark.addCase("w/ groupBy") { _ =>
      sparkSession.range(N).groupBy($"id" % 10).agg("id" -> "hive_max").collect()
    }

    benchmark.run()

    sparkSession.sql(s"DROP TEMPORARY FUNCTION IF EXISTS hive_max")
  }
```

## How was this patch tested?

New test suite `HiveUDAFSuite` is added.

Author: Cheng Lian <lian@databricks.com>

Closes #15703 from liancheng/partial-agg-hive-udaf.
parent a36a76ac
No related branches found
No related tags found
No related merge requests found
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment