diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index f238b9a4f7f6f14d90f9bf66122469feca0aad32..cc8907a0bbc93453999ff6a530d2ed2b851d1fc7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -39,8 +39,10 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.CastSupport import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -65,7 +67,7 @@ class HadoopTableReader( @transient private val tableDesc: TableDesc, @transient private val sparkSession: SparkSession, hadoopConf: Configuration) - extends TableReader with Logging { + extends TableReader with CastSupport with Logging { // Hadoop honors "mapreduce.job.maps" as hint, // but will ignore when mapreduce.jobtracker.address is "local". @@ -86,6 +88,8 @@ class HadoopTableReader( private val _broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + override def conf: SQLConf = sparkSession.sessionState.conf + override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] = makeRDDForTable( hiveTable, @@ -227,7 +231,7 @@ class HadoopTableReader( def fillPartitionKeys(rawPartValues: Array[String], row: InternalRow): Unit = { partitionKeyAttrs.foreach { case (attr, ordinal) => val partOrdinal = partitionKeys.indexOf(attr) - row(ordinal) = Cast(Literal(rawPartValues(partOrdinal)), attr.dataType).eval(null) + row(ordinal) = cast(Literal(rawPartValues(partOrdinal)), attr.dataType).eval(null) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 896f24f2e223d0e352338dc51ba8e530387680bc..48d0b4a63e54a086b328c883f52baa07f6490444 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.CastSupport import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan @@ -37,6 +38,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BooleanType, DataType} import org.apache.spark.util.Utils @@ -53,11 +55,13 @@ case class HiveTableScanExec( relation: HiveTableRelation, partitionPruningPred: Seq[Expression])( @transient private val sparkSession: SparkSession) - extends LeafExecNode { + extends LeafExecNode with CastSupport { require(partitionPruningPred.isEmpty || relation.isPartitioned, "Partition pruning predicates only supported for partitioned tables.") + override def conf: SQLConf = sparkSession.sessionState.conf + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) @@ -104,7 +108,7 @@ case class HiveTableScanExec( hadoopConf) private def castFromString(value: String, dataType: DataType) = { - Cast(Literal(value), dataType).eval(null) + cast(Literal(value), dataType).eval(null) } private def addColumnMetadataToConf(hiveConf: Configuration): Unit = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala index 43b6bf5feeb603b81dd0fa3d6b77bbb8f9740f27..b2dc401ce1efca730d7c0867408c6fe45c515161 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive import java.io.File +import java.sql.Timestamp import com.google.common.io.Files import org.apache.hadoop.fs.FileSystem @@ -68,4 +69,20 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl sql("DROP TABLE IF EXISTS createAndInsertTest") } } + + test("SPARK-21739: Cast expression should initialize timezoneId") { + withTable("table_with_timestamp_partition") { + sql("CREATE TABLE table_with_timestamp_partition(value int) PARTITIONED BY (ts TIMESTAMP)") + sql("INSERT OVERWRITE TABLE table_with_timestamp_partition " + + "PARTITION (ts = '2010-01-01 00:00:00.000') VALUES (1)") + + // test for Cast expression in TableReader + checkAnswer(sql("SELECT * FROM table_with_timestamp_partition"), + Seq(Row(1, Timestamp.valueOf("2010-01-01 00:00:00.000")))) + + // test for Cast expression in HiveTableScanExec + checkAnswer(sql("SELECT value FROM table_with_timestamp_partition " + + "WHERE ts = '2010-01-01 00:00:00.000'"), Row(1)) + } + } }