Skip to content
Snippets Groups Projects
Commit 940f3756 authored by Michael Davies's avatar Michael Davies Committed by Michael Armbrust
Browse files

[SPARK-5309][SQL] Add support for dictionaries in PrimitiveConverter for Strin...

...gs.

Parquet Converters allow developers to take advantage of dictionary encoding of column data to reduce Column Binary decoding.

The Spark PrimitiveConverter was not using that API and consequently for String columns that used dictionary compression repeated Binary to String conversions for the same String.

In measurements this could account for over 25% of entire query time.
For example a 500M row table split across 16 blocks was aggregated and summed in a litte under 30s before this change and a little under 20s after the change.

Author: Michael Davies <Michael.BellDavies@gmail.com>

Closes #4187 from MickDavies/SPARK-5309-2 and squashes the following commits:

327287e [Michael Davies] SPARK-5309: Add support for dictionaries in PrimitiveConverter for Strings.
33c002c [Michael Davies] SPARK-5309: Add support for dictionaries in PrimitiveConverter for Strings.
parent bce0ba1f
No related branches found
No related tags found
No related merge requests found
......@@ -19,6 +19,7 @@ package org.apache.spark.sql.parquet
import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap}
import parquet.column.Dictionary
import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter}
import parquet.schema.MessageType
......@@ -102,12 +103,8 @@ private[sql] object CatalystConverter {
}
// Strings, Shorts and Bytes do not have a corresponding type in Parquet
// so we need to treat them separately
case StringType => {
new CatalystPrimitiveConverter(parent, fieldIndex) {
override def addBinary(value: Binary): Unit =
parent.updateString(fieldIndex, value)
}
}
case StringType =>
new CatalystPrimitiveStringConverter(parent, fieldIndex)
case ShortType => {
new CatalystPrimitiveConverter(parent, fieldIndex) {
override def addInt(value: Int): Unit =
......@@ -197,8 +194,8 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit =
updateField(fieldIndex, value.getBytes)
protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit =
updateField(fieldIndex, value.toStringUsingUTF8)
protected[parquet] def updateString(fieldIndex: Int, value: String): Unit =
updateField(fieldIndex, value)
protected[parquet] def updateDecimal(fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = {
updateField(fieldIndex, readDecimal(new Decimal(), value, ctype))
......@@ -384,8 +381,8 @@ private[parquet] class CatalystPrimitiveRowConverter(
override protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit =
current.update(fieldIndex, value.getBytes)
override protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit =
current.setString(fieldIndex, value.toStringUsingUTF8)
override protected[parquet] def updateString(fieldIndex: Int, value: String): Unit =
current.setString(fieldIndex, value)
override protected[parquet] def updateDecimal(
fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = {
......@@ -426,6 +423,33 @@ private[parquet] class CatalystPrimitiveConverter(
parent.updateLong(fieldIndex, value)
}
/**
* A `parquet.io.api.PrimitiveConverter` that converts Parquet Binary to Catalyst String.
* Supports dictionaries to reduce Binary to String conversion overhead.
*
* Follows pattern in Parquet of using dictionaries, where supported, for String conversion.
*
* @param parent The parent group converter.
* @param fieldIndex The index inside the record.
*/
private[parquet] class CatalystPrimitiveStringConverter(parent: CatalystConverter, fieldIndex: Int)
extends CatalystPrimitiveConverter(parent, fieldIndex) {
private[this] var dict: Array[String] = null
override def hasDictionarySupport: Boolean = true
override def setDictionary(dictionary: Dictionary):Unit =
dict = Array.tabulate(dictionary.getMaxId + 1) {dictionary.decodeToBinary(_).toStringUsingUTF8}
override def addValueFromDictionary(dictionaryId: Int): Unit =
parent.updateString(fieldIndex, dict(dictionaryId))
override def addBinary(value: Binary): Unit =
parent.updateString(fieldIndex, value.toStringUsingUTF8)
}
private[parquet] object CatalystArrayConverter {
val INITIAL_ARRAY_SIZE = 20
}
......@@ -583,9 +607,9 @@ private[parquet] class CatalystNativeArrayConverter(
elements += 1
}
override protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit = {
override protected[parquet] def updateString(fieldIndex: Int, value: String): Unit = {
checkGrowBuffer()
buffer(elements) = value.toStringUsingUTF8.asInstanceOf[NativeType]
buffer(elements) = value.asInstanceOf[NativeType]
elements += 1
}
......
......@@ -85,4 +85,15 @@ class ParquetQuerySuite extends QueryTest with ParquetTest {
checkAnswer(sql(s"SELECT _1 FROM t WHERE _1 < 10"), (1 to 9).map(Row.apply(_)))
}
}
test("SPARK-5309 strings stored using dictionary compression in parquet") {
withParquetTable((0 until 1000).map(i => ("same", "run_" + i /100, 1)), "t") {
checkAnswer(sql(s"SELECT _1, _2, SUM(_3) FROM t GROUP BY _1, _2"),
(0 until 10).map(i => Row("same", "run_" + i, 100)))
checkAnswer(sql(s"SELECT _1, _2, SUM(_3) FROM t WHERE _2 = 'run_5' GROUP BY _1, _2"),
List(Row("same", "run_5", 100)))
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment