Skip to content
Snippets Groups Projects
Commit 37686539 authored by hyukjinkwon's avatar hyukjinkwon Committed by Davies Liu
Browse files

[SPARK-17388] [SQL] Support for inferring type date/timestamp/decimal for partition column

## What changes were proposed in this pull request?

Currently, Spark only supports to infer `IntegerType`, `LongType`, `DoubleType` and `StringType`.

`DecimalType` is being tried but it seems it never infers type as `DecimalType` as `DoubleType` is being tried first. Also, it seems `DateType` and `TimestampType` could be inferred.

As far as I know, it is pretty common to use both for a partition column.

This PR fixes the incorrect `DecimalType` try and also adds the support for both `DateType` and `TimestampType` for inferring partition column type.

## How was this patch tested?

Unit tests in `ParquetPartitionDiscoverySuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #14947 from HyukjinKwon/SPARK-17388.
parent e59df62e
No related branches found
No related tags found
No related merge requests found
......@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
import java.lang.{Double => JDouble, Long => JLong}
import java.math.{BigDecimal => JBigDecimal}
import java.sql.{Date => JDate, Timestamp => JTimestamp}
import scala.collection.mutable.ArrayBuffer
import scala.util.Try
......@@ -307,20 +308,34 @@ object PartitioningUtils {
/**
* Converts a string to a [[Literal]] with automatic type inference. Currently only supports
* [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType.SYSTEM_DEFAULT]], and
* [[StringType]].
* [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType]], [[DateType]]
* [[TimestampType]], and [[StringType]].
*/
private[datasources] def inferPartitionColumnValue(
raw: String,
defaultPartitionName: String,
typeInference: Boolean): Literal = {
val decimalTry = Try {
// `BigDecimal` conversion can fail when the `field` is not a form of number.
val bigDecimal = new JBigDecimal(raw)
// It reduces the cases for decimals by disallowing values having scale (eg. `1.1`).
require(bigDecimal.scale <= 0)
// `DecimalType` conversion can fail when
// 1. The precision is bigger than 38.
// 2. scale is bigger than precision.
Literal(bigDecimal)
}
if (typeInference) {
// First tries integral types
Try(Literal.create(Integer.parseInt(raw), IntegerType))
.orElse(Try(Literal.create(JLong.parseLong(raw), LongType)))
.orElse(decimalTry)
// Then falls back to fractional types
.orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType)))
.orElse(Try(Literal(new JBigDecimal(raw))))
// Then falls back to date/timestamp types
.orElse(Try(Literal(JDate.valueOf(raw))))
.orElse(Try(Literal(JTimestamp.valueOf(unescapePathName(raw)))))
// Then falls back to string
.getOrElse {
if (raw == defaultPartitionName) {
......
......@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.io.File
import java.math.BigInteger
import java.sql.Timestamp
import java.sql.{Date, Timestamp}
import scala.collection.mutable.ArrayBuffer
......@@ -56,8 +56,14 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
check("10", Literal.create(10, IntegerType))
check("1000000000000000", Literal.create(1000000000000000L, LongType))
val decimal = Decimal("1" * 20)
check("1" * 20,
Literal.create(decimal, DecimalType(decimal.precision, decimal.scale)))
check("1.5", Literal.create(1.5, DoubleType))
check("hello", Literal.create("hello", StringType))
check("1990-02-24", Literal.create(Date.valueOf("1990-02-24"), DateType))
check("1990-02-24 12:00:30",
Literal.create(Timestamp.valueOf("1990-02-24 12:00:30"), TimestampType))
check(defaultPartitionName, Literal.create(null, NullType))
}
......@@ -687,6 +693,40 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
}
}
test("Various inferred partition value types") {
val row =
Row(
Long.MaxValue,
4.5,
new java.math.BigDecimal(new BigInteger("1" * 20)),
java.sql.Date.valueOf("2015-05-23"),
java.sql.Timestamp.valueOf("1990-02-24 12:00:30"),
"This is a string, /[]?=:",
"This is not a partition column")
val partitionColumnTypes =
Seq(
LongType,
DoubleType,
DecimalType(20, 0),
DateType,
TimestampType,
StringType)
val partitionColumns = partitionColumnTypes.zipWithIndex.map {
case (t, index) => StructField(s"p_$index", t)
}
val schema = StructType(partitionColumns :+ StructField(s"i", StringType))
val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema)
withTempPath { dir =>
df.write.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString)
val fields = schema.map(f => Column(f.name))
checkAnswer(spark.read.load(dir.toString).select(fields: _*), row)
}
}
test("SPARK-8037: Ignores files whose name starts with dot") {
withTempPath { dir =>
val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment