From 98bcc188f98e44c1675d8b3a28f44f4f900abc43 Mon Sep 17 00:00:00 2001
From: Liang-Chi Hsieh <viirya@gmail.com>
Date: Fri, 3 Mar 2017 07:14:37 -0800
Subject: [PATCH] [SPARK-19758][SQL] Resolving timezone aware expressions with
 time zone when resolving inline table

## What changes were proposed in this pull request?

When we resolve inline tables in analyzer, we will evaluate the expressions of inline tables.

When it evaluates a `TimeZoneAwareExpression` expression, an error will happen because the `TimeZoneAwareExpression` is not associated with timezone yet.

So we need to resolve these `TimeZoneAwareExpression`s with time zone when resolving inline tables.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #17114 from viirya/resolve-timeawareexpr-inline-table.
---
 .../sql/catalyst/analysis/Analyzer.scala      |  2 +-
 .../analysis/ResolveInlineTables.scala        | 16 +++++---
 .../analysis/ResolveInlineTablesSuite.scala   | 40 ++++++++++++-------
 .../sql-tests/inputs/inline-table.sql         |  3 ++
 .../sql-tests/results/inline-table.sql.out    | 10 ++++-
 5 files changed, 48 insertions(+), 23 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index c477cb48d0..6d569b612d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -146,7 +146,7 @@ class Analyzer(
       GlobalAggregates ::
       ResolveAggregateFunctions ::
       TimeWindowing ::
-      ResolveInlineTables ::
+      ResolveInlineTables(conf) ::
       TypeCoercion.typeCoercionRules ++
       extendedResolutionRules : _*),
     Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
index 7323197b10..d5b3ea8c37 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
@@ -19,8 +19,8 @@ package org.apache.spark.sql.catalyst.analysis
 
 import scala.util.control.NonFatal
 
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Cast
+import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.{Cast, TimeZoneAwareExpression}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.types.{StructField, StructType}
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types.{StructField, StructType}
 /**
  * An analyzer rule that replaces [[UnresolvedInlineTable]] with [[LocalRelation]].
  */
-object ResolveInlineTables extends Rule[LogicalPlan] {
+case class ResolveInlineTables(conf: CatalystConf) extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
     case table: UnresolvedInlineTable if table.expressionsResolved =>
       validateInputDimension(table)
@@ -95,11 +95,15 @@ object ResolveInlineTables extends Rule[LogicalPlan] {
       InternalRow.fromSeq(row.zipWithIndex.map { case (e, ci) =>
         val targetType = fields(ci).dataType
         try {
-          if (e.dataType.sameType(targetType)) {
-            e.eval()
+          val castedExpr = if (e.dataType.sameType(targetType)) {
+            e
           } else {
-            Cast(e, targetType).eval()
+            Cast(e, targetType)
           }
+          castedExpr.transform {
+            case e: TimeZoneAwareExpression if e.timeZoneId.isEmpty =>
+              e.withTimeZone(conf.sessionLocalTimeZone)
+          }.eval()
         } catch {
           case NonFatal(ex) =>
             table.failAnalysis(s"failed to evaluate expression ${e.sql}: ${ex.getMessage}")
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala
index 920c6ea50f..f45a826869 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala
@@ -20,68 +20,67 @@ package org.apache.spark.sql.catalyst.analysis
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Literal, Rand}
+import org.apache.spark.sql.catalyst.expressions.{Cast, Literal, Rand}
 import org.apache.spark.sql.catalyst.expressions.aggregate.Count
-import org.apache.spark.sql.catalyst.plans.PlanTest
-import org.apache.spark.sql.types.{LongType, NullType}
+import org.apache.spark.sql.types.{LongType, NullType, TimestampType}
 
 /**
  * Unit tests for [[ResolveInlineTables]]. Note that there are also test cases defined in
  * end-to-end tests (in sql/core module) for verifying the correct error messages are shown
  * in negative cases.
  */
-class ResolveInlineTablesSuite extends PlanTest with BeforeAndAfter {
+class ResolveInlineTablesSuite extends AnalysisTest with BeforeAndAfter {
 
   private def lit(v: Any): Literal = Literal(v)
 
   test("validate inputs are foldable") {
-    ResolveInlineTables.validateInputEvaluable(
+    ResolveInlineTables(conf).validateInputEvaluable(
       UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(lit(1)))))
 
     // nondeterministic (rand) should not work
     intercept[AnalysisException] {
-      ResolveInlineTables.validateInputEvaluable(
+      ResolveInlineTables(conf).validateInputEvaluable(
         UnresolvedInlineTable(Seq("c1"), Seq(Seq(Rand(1)))))
     }
 
     // aggregate should not work
     intercept[AnalysisException] {
-      ResolveInlineTables.validateInputEvaluable(
+      ResolveInlineTables(conf).validateInputEvaluable(
         UnresolvedInlineTable(Seq("c1"), Seq(Seq(Count(lit(1))))))
     }
 
     // unresolved attribute should not work
     intercept[AnalysisException] {
-      ResolveInlineTables.validateInputEvaluable(
+      ResolveInlineTables(conf).validateInputEvaluable(
         UnresolvedInlineTable(Seq("c1"), Seq(Seq(UnresolvedAttribute("A")))))
     }
   }
 
   test("validate input dimensions") {
-    ResolveInlineTables.validateInputDimension(
+    ResolveInlineTables(conf).validateInputDimension(
       UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(lit(2)))))
 
     // num alias != data dimension
     intercept[AnalysisException] {
-      ResolveInlineTables.validateInputDimension(
+      ResolveInlineTables(conf).validateInputDimension(
         UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(lit(1)), Seq(lit(2)))))
     }
 
     // num alias == data dimension, but data themselves are inconsistent
     intercept[AnalysisException] {
-      ResolveInlineTables.validateInputDimension(
+      ResolveInlineTables(conf).validateInputDimension(
         UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(lit(21), lit(22)))))
     }
   }
 
   test("do not fire the rule if not all expressions are resolved") {
     val table = UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(UnresolvedAttribute("A"))))
-    assert(ResolveInlineTables(table) == table)
+    assert(ResolveInlineTables(conf)(table) == table)
   }
 
   test("convert") {
     val table = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(lit(2L))))
-    val converted = ResolveInlineTables.convert(table)
+    val converted = ResolveInlineTables(conf).convert(table)
 
     assert(converted.output.map(_.dataType) == Seq(LongType))
     assert(converted.data.size == 2)
@@ -89,13 +88,24 @@ class ResolveInlineTablesSuite extends PlanTest with BeforeAndAfter {
     assert(converted.data(1).getLong(0) == 2L)
   }
 
+  test("convert TimeZoneAwareExpression") {
+    val table = UnresolvedInlineTable(Seq("c1"),
+      Seq(Seq(Cast(lit("1991-12-06 00:00:00.0"), TimestampType))))
+    val converted = ResolveInlineTables(conf).convert(table)
+    val correct = Cast(lit("1991-12-06 00:00:00.0"), TimestampType)
+      .withTimeZone(conf.sessionLocalTimeZone).eval().asInstanceOf[Long]
+    assert(converted.output.map(_.dataType) == Seq(TimestampType))
+    assert(converted.data.size == 1)
+    assert(converted.data(0).getLong(0) == correct)
+  }
+
   test("nullability inference in convert") {
     val table1 = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(lit(2L))))
-    val converted1 = ResolveInlineTables.convert(table1)
+    val converted1 = ResolveInlineTables(conf).convert(table1)
     assert(!converted1.schema.fields(0).nullable)
 
     val table2 = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(Literal(null, NullType))))
-    val converted2 = ResolveInlineTables.convert(table2)
+    val converted2 = ResolveInlineTables(conf).convert(table2)
     assert(converted2.schema.fields(0).nullable)
   }
 }
diff --git a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql
index 5107fa4d55..b3ec956cd1 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql
@@ -46,3 +46,6 @@ select * from values ("one", random_not_exist_func(1)), ("two", 2) as data(a, b)
 
 -- error reporting: aggregate expression
 select * from values ("one", count(1)), ("two", 2) as data(a, b);
+
+-- string to timestamp
+select * from values (timestamp('1991-12-06 00:00:00.0'), array(timestamp('1991-12-06 01:00:00.0'), timestamp('1991-12-06 12:00:00.0'))) as data(a, b);
diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
index de6f01b8de..4e80f0bda5 100644
--- a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 16
+-- Number of queries: 17
 
 
 -- !query 0
@@ -143,3 +143,11 @@ struct<>
 -- !query 15 output
 org.apache.spark.sql.AnalysisException
 cannot evaluate expression count(1) in inline table definition; line 1 pos 29
+
+
+-- !query 16
+select * from values (timestamp('1991-12-06 00:00:00.0'), array(timestamp('1991-12-06 01:00:00.0'), timestamp('1991-12-06 12:00:00.0'))) as data(a, b)
+-- !query 16 schema
+struct<a:timestamp,b:array<timestamp>>
+-- !query 16 output
+1991-12-06 00:00:00	[1991-12-06 01:00:00.0,1991-12-06 12:00:00.0]
-- 
GitLab