Skip to content
Snippets Groups Projects
Commit f7d21437 authored by hyukjinkwon's avatar hyukjinkwon Committed by Davies Liu
Browse files

[SPARK-17354] [SQL] Partitioning by dates/timestamps should work with Parquet vectorized reader

## What changes were proposed in this pull request?

This PR fixes `ColumnVectorUtils.populate` so that Parquet vectorized reader can read partitioned table with dates/timestamps. This works fine with Parquet normal reader.

This is being only called within [VectorizedParquetRecordReader.java#L185](https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java#L185).

When partition column types are explicitly given to `DateType` or `TimestampType` (rather than inferring the type of partition column), this fails with the exception below:

```
16/09/01 10:30:07 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 6)
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.sql.Date
	at org.apache.spark.sql.execution.vectorized.ColumnVectorUtils.populate(ColumnVectorUtils.java:89)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:185)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:204)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:362)
...
```

## How was this patch tested?

Unit tests in `SQLQuerySuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #14919 from HyukjinKwon/SPARK-17354.
parent a3981c28
No related branches found
No related tags found
No related merge requests found
......@@ -86,8 +86,9 @@ public class ColumnVectorUtils {
col.getChildColumn(0).putInts(0, capacity, c.months);
col.getChildColumn(1).putLongs(0, capacity, c.microseconds);
} else if (t instanceof DateType) {
Date date = (Date)row.get(fieldIdx, t);
col.putInts(0, capacity, DateTimeUtils.fromJavaDate(date));
col.putInts(0, capacity, row.getInt(fieldIdx));
} else if (t instanceof TimestampType) {
col.putLongs(0, capacity, row.getLong(fieldIdx));
}
}
}
......
......@@ -137,6 +137,10 @@ public final class ColumnarBatch {
DataType dt = columns[i].dataType();
if (dt instanceof BooleanType) {
row.setBoolean(i, getBoolean(i));
} else if (dt instanceof ByteType) {
row.setByte(i, getByte(i));
} else if (dt instanceof ShortType) {
row.setShort(i, getShort(i));
} else if (dt instanceof IntegerType) {
row.setInt(i, getInt(i));
} else if (dt instanceof LongType) {
......@@ -154,6 +158,8 @@ public final class ColumnarBatch {
row.setDecimal(i, getDecimal(i, t.precision(), t.scale()), t.precision());
} else if (dt instanceof DateType) {
row.setInt(i, getInt(i));
} else if (dt instanceof TimestampType) {
row.setLong(i, getLong(i));
} else {
throw new RuntimeException("Not implemented. " + dt);
}
......
......@@ -38,11 +38,12 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser}
import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
// Write support class for nested groups: ParquetWriter initializes GroupWriteSupport
// with an empty configuration (it is after all not intended to be used in this way?)
......@@ -689,6 +690,52 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}
}
test("VectorizedParquetRecordReader - partition column types") {
withTempPath { dir =>
Seq(1).toDF().repartition(1).write.parquet(dir.getCanonicalPath)
val dataTypes =
Seq(StringType, BooleanType, ByteType, ShortType, IntegerType, LongType,
FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType)
val constantValues =
Seq(
UTF8String.fromString("a string"),
true,
1.toByte,
2.toShort,
3,
Long.MaxValue,
0.25.toFloat,
0.75D,
Decimal("1234.23456"),
DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")),
DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123")))
dataTypes.zip(constantValues).foreach { case (dt, v) =>
val schema = StructType(StructField("pcol", dt) :: Nil)
val vectorizedReader = new VectorizedParquetRecordReader
val partitionValues = new GenericMutableRow(Array(v))
val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
try {
vectorizedReader.initialize(file, null)
vectorizedReader.initBatch(schema, partitionValues)
vectorizedReader.nextKeyValue()
val row = vectorizedReader.getCurrentValue.asInstanceOf[InternalRow]
// Use `GenericMutableRow` by explicitly copying rather than `ColumnarBatch`
// in order to use get(...) method which is not implemented in `ColumnarBatch`.
val actual = row.copy().get(1, dt)
val expected = v
assert(actual == expected)
} finally {
vectorizedReader.close()
}
}
}
}
}
class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
......
......@@ -1787,6 +1787,27 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
test("SPARK-17354: Partitioning by dates/timestamps works with Parquet vectorized reader") {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
sql(
"""CREATE TABLE order(id INT)
|PARTITIONED BY (pd DATE, pt TIMESTAMP)
|STORED AS PARQUET
""".stripMargin)
sql("set hive.exec.dynamic.partition.mode=nonstrict")
sql(
"""INSERT INTO TABLE order PARTITION(pd, pt)
|SELECT 1 AS id, CAST('1990-02-24' AS DATE) AS pd, CAST('1990-02-24' AS TIMESTAMP) AS pt
""".stripMargin)
val actual = sql("SELECT * FROM order")
val expected = sql(
"SELECT 1 AS id, CAST('1990-02-24' AS DATE) AS pd, CAST('1990-02-24' AS TIMESTAMP) AS pt")
checkAnswer(actual, expected)
sql("DROP TABLE order")
}
}
def testCommandAvailable(command: String): Boolean = {
val attempt = Try(Process(command).run(ProcessLogger(_ => ())).exitValue())
attempt.isSuccess && attempt.get == 0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment