diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 448251cfdc69733b67935ac44bf8732ac24331ac..e838a13af72d06f334b4181ceadd19f762294575 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1376,6 +1376,18 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession
     </p>
   </td>
 </tr>
+<tr>
+  <td><code>spark.sql.optimizer.metadataOnly</code></td>
+  <td>true</td>
+  <td>
+    <p>
+      When true, enable the metadata-only query optimization that use the table's metadata to
+      produce the partition columns instead of table scans. It applies when all the columns scanned
+      are partition columns and the query has an aggregate operator that satisfies distinct
+      semantics.
+    </p>
+  </td>
+</tr>
 </table>
 
 ## JSON Datasets
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
new file mode 100644
index 0000000000000000000000000000000000000000..1b7fedca8484c1a3f1bb59fc6764ec5b11d744a8
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule optimizes the execution of queries that can be answered by looking only at
+ * partition-level metadata. This applies when all the columns scanned are partition columns, and
+ * the query has an aggregate operator that satisfies the following conditions:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1.
+ * 3. aggregate function on partition columns which have same result w or w/o DISTINCT keyword.
+ *  e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnlyQuery(
+    catalog: SessionCatalog,
+    conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+    if (!conf.optimizerMetadataOnly) {
+      return plan
+    }
+
+    plan.transform {
+      case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(partAttrs, relation)) =>
+        // We only apply this optimization when only partitioned attributes are scanned.
+        if (a.references.subsetOf(partAttrs)) {
+          val aggFunctions = aggExprs.flatMap(_.collect {
+            case agg: AggregateExpression => agg
+          })
+          val isAllDistinctAgg = aggFunctions.forall { agg =>
+            agg.isDistinct || (agg.aggregateFunction match {
+              // `Max`, `Min`, `First` and `Last` are always distinct aggregate functions no matter
+              // they have DISTINCT keyword or not, as the result will be same.
+              case _: Max => true
+              case _: Min => true
+              case _: First => true
+              case _: Last => true
+              case _ => false
+            })
+          }
+          if (isAllDistinctAgg) {
+            a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation)))
+          } else {
+            a
+          }
+        } else {
+          a
+        }
+    }
+  }
+
+  /**
+   * Returns the partition attributes of the table relation plan.
+   */
+  private def getPartitionAttrs(
+      partitionColumnNames: Seq[String],
+      relation: LogicalPlan): Seq[Attribute] = {
+    val partColumns = partitionColumnNames.map(_.toLowerCase).toSet
+    relation.output.filter(a => partColumns.contains(a.name.toLowerCase))
+  }
+
+  /**
+   * Transform the given plan, find its table scan nodes that matches the given relation, and then
+   * replace the table scan node with its corresponding partition values.
+   */
+  private def replaceTableScanWithPartitionMetadata(
+      child: LogicalPlan,
+      relation: LogicalPlan): LogicalPlan = {
+    child transform {
+      case plan if plan eq relation =>
+        relation match {
+          case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+            val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
+            val partitionData = fsRelation.location.listFiles(filters = Nil)
+            LocalRelation(partAttrs, partitionData.map(_.values))
+
+          case relation: CatalogRelation =>
+            val partAttrs = getPartitionAttrs(relation.catalogTable.partitionColumnNames, relation)
+            val partitionData = catalog.listPartitions(relation.catalogTable.identifier).map { p =>
+              InternalRow.fromSeq(partAttrs.map { attr =>
+                Cast(Literal(p.spec(attr.name)), attr.dataType).eval()
+              })
+            }
+            LocalRelation(partAttrs, partitionData)
+
+          case _ =>
+            throw new IllegalStateException(s"unrecognized table scan node: $relation, " +
+              s"please turn off ${SQLConf.OPTIMIZER_METADATA_ONLY.key} and try again.")
+        }
+    }
+  }
+
+  /**
+   * A pattern that finds the partitioned table relation node inside the given plan, and returns a
+   * pair of the partition attributes and the table relation node.
+   *
+   * It keeps traversing down the given plan tree if there is a [[Project]] or [[Filter]] with
+   * deterministic expressions, and returns result after reaching the partitioned table relation
+   * node.
+   */
+  object PartitionedRelation {
+
+    def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = plan match {
+      case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _)
+        if fsRelation.partitionSchema.nonEmpty =>
+        val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
+        Some(AttributeSet(partAttrs), l)
+
+      case relation: CatalogRelation if relation.catalogTable.partitionColumnNames.nonEmpty =>
+        val partAttrs = getPartitionAttrs(relation.catalogTable.partitionColumnNames, relation)
+        Some(AttributeSet(partAttrs), relation)
+
+      case p @ Project(projectList, child) if projectList.forall(_.deterministic) =>
+        unapply(child).flatMap { case (partAttrs, relation) =>
+          if (p.references.subsetOf(partAttrs)) Some(p.outputSet, relation) else None
+        }
+
+      case f @ Filter(condition, child) if condition.deterministic =>
+        unapply(child).flatMap { case (partAttrs, relation) =>
+          if (f.references.subsetOf(partAttrs)) Some(partAttrs, relation) else None
+        }
+
+      case _ => None
+    }
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index 12a10cba20fe9f884ded20be0b0bfc7b6d8782e4..8b762b5d6c5f209075cf6fa4bb2de09784597380 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -30,6 +30,7 @@ class SparkOptimizer(
   extends Optimizer(catalog, conf) {
 
   override def batches: Seq[Batch] = super.batches :+
+    Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog, conf)) :+
     Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+
     Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 5ab0c1d4c41eb0fa22e801b4f48c1dce0177bf59..14a1680fafa3ab4219403a191f202146e6a84c9f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -258,6 +258,14 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val OPTIMIZER_METADATA_ONLY = SQLConfigBuilder("spark.sql.optimizer.metadataOnly")
+    .doc("When true, enable the metadata-only query optimization that use the table's metadata " +
+      "to produce the partition columns instead of table scans. It applies when all the columns " +
+      "scanned are partition columns and the query has an aggregate operator that satisfies " +
+      "distinct semantics.")
+    .booleanConf
+    .createWithDefault(true)
+
   val COLUMN_NAME_OF_CORRUPT_RECORD = SQLConfigBuilder("spark.sql.columnNameOfCorruptRecord")
     .doc("The name of internal column for storing raw/un-parsed JSON records that fail to parse.")
     .stringConf
@@ -594,6 +602,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
 
+  def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)
+
   def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED)
 
   def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index ede7d9a0c95b98486cef7bc0b82fda3965f53c1f..eeaa0103a08e6556c643d8b6608a8256bf611a3d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2929,4 +2929,40 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
       sql(s"SELECT '$literal' AS DUMMY"),
       Row(s"$expected") :: Nil)
   }
+
+  test("SPARK-15752 optimize metadata only query for datasource table") {
+    withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
+      withTable("srcpart_15752") {
+        val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) "a" else "b"))
+          .toDF("col1", "col2", "partcol1", "partcol2")
+        data.write.partitionBy("partcol1", "partcol2").mode("append").saveAsTable("srcpart_15752")
+        checkAnswer(
+          sql("select partcol1 from srcpart_15752 group by partcol1"),
+          Row(0) :: Row(1) :: Nil)
+        checkAnswer(
+          sql("select partcol1 from srcpart_15752 where partcol1 = 1 group by partcol1"),
+          Row(1))
+        checkAnswer(
+          sql("select partcol1, count(distinct partcol2) from srcpart_15752 group by partcol1"),
+          Row(0, 1) :: Row(1, 1) :: Nil)
+        checkAnswer(
+          sql("select partcol1, count(distinct partcol2) from srcpart_15752  where partcol1 = 1 " +
+            "group by partcol1"),
+          Row(1, 1) :: Nil)
+        checkAnswer(sql("select distinct partcol1 from srcpart_15752"), Row(0) :: Row(1) :: Nil)
+        checkAnswer(sql("select distinct partcol1 from srcpart_15752 where partcol1 = 1"), Row(1))
+        checkAnswer(
+          sql("select distinct col from (select partcol1 + 1 as col from srcpart_15752 " +
+            "where partcol1 = 1) t"),
+          Row(2))
+        checkAnswer(sql("select max(partcol1) from srcpart_15752"), Row(1))
+        checkAnswer(sql("select max(partcol1) from srcpart_15752 where partcol1 = 1"), Row(1))
+        checkAnswer(sql("select max(partcol1) from (select partcol1 from srcpart_15752) t"), Row(1))
+        checkAnswer(
+          sql("select max(col) from (select partcol1 + 1 as col from srcpart_15752 " +
+            "where partcol1 = 1) t"),
+          Row(2))
+      }
+    }
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..58c310596ca6d768808c235fe18a7bb1ca1aa6b8
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) "even" else "odd"))
+      .toDF("col1", "col2", "partcol1", "partcol2")
+    data.write.partitionBy("partcol1", "partcol2").mode("append").saveAsTable("srcpart")
+  }
+
+  override protected def afterAll(): Unit = {
+    try {
+      sql("DROP TABLE IF EXISTS srcpart")
+    } finally {
+      super.afterAll()
+    }
+  }
+
+  private def assertMetadataOnlyQuery(df: DataFrame): Unit = {
+    val localRelations = df.queryExecution.optimizedPlan.collect {
+      case l @ LocalRelation(_, _) => l
+    }
+    assert(localRelations.size == 1)
+  }
+
+  private def assertNotMetadataOnlyQuery(df: DataFrame): Unit = {
+    val localRelations = df.queryExecution.optimizedPlan.collect {
+      case l @ LocalRelation(_, _) => l
+    }
+    assert(localRelations.size == 0)
+  }
+
+  private def testMetadataOnly(name: String, sqls: String*): Unit = {
+    test(name) {
+      withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
+        sqls.foreach { case q => assertMetadataOnlyQuery(sql(q)) }
+      }
+      withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
+        sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) }
+      }
+    }
+  }
+
+  private def testNotMetadataOnly(name: String, sqls: String*): Unit = {
+    test(name) {
+      withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
+        sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) }
+      }
+      withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
+        sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) }
+      }
+    }
+  }
+
+  testMetadataOnly(
+    "Aggregate expression is partition columns",
+    "select partcol1 from srcpart group by partcol1",
+    "select partcol2 from srcpart where partcol1 = 0 group by partcol2")
+
+  testMetadataOnly(
+    "Distinct aggregate function on partition columns",
+    "SELECT partcol1, count(distinct partcol2) FROM srcpart group by partcol1",
+    "SELECT partcol1, count(distinct partcol2) FROM srcpart where partcol1 = 0 group by partcol1")
+
+  testMetadataOnly(
+    "Distinct on partition columns",
+    "select distinct partcol1, partcol2 from srcpart",
+    "select distinct c1 from (select partcol1 + 1 as c1 from srcpart where partcol1 = 0) t")
+
+  testMetadataOnly(
+    "Aggregate function on partition columns which have same result w or w/o DISTINCT keyword",
+    "select max(partcol1) from srcpart",
+    "select min(partcol1) from srcpart where partcol1 = 0",
+    "select first(partcol1) from srcpart",
+    "select last(partcol1) from srcpart where partcol1 = 0",
+    "select partcol2, min(partcol1) from srcpart where partcol1 = 0 group by partcol2",
+    "select max(c1) from (select partcol1 + 1 as c1 from srcpart where partcol1 = 0) t")
+
+  testNotMetadataOnly(
+    "Don't optimize metadata only query for non-partition columns",
+    "select col1 from srcpart group by col1",
+    "select partcol1, max(col1) from srcpart group by partcol1",
+    "select partcol1, count(distinct col1) from srcpart group by partcol1",
+    "select distinct partcol1, col1 from srcpart")
+
+  testNotMetadataOnly(
+    "Don't optimize metadata only query for non-distinct aggregate function on partition columns",
+    "select partcol1, sum(partcol2) from srcpart group by partcol1",
+    "select partcol1, count(partcol2) from srcpart group by partcol1")
+
+  testNotMetadataOnly(
+    "Don't optimize metadata only query for GroupingSet/Union operator",
+    "select partcol1, max(partcol2) from srcpart where partcol1 = 0 group by rollup (partcol1)",
+    "select partcol2 from (select partcol2 from srcpart where partcol1 = 0 union all " +
+      "select partcol2 from srcpart where partcol1 = 1) t group by partcol2")
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index e8af4fbe876e1deddf7edb7f6f0d9cf98ed1c808..a43f0d0d7e97cdfe820bb7904c80f767a18e45c5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1689,4 +1689,93 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
       )
     }
   }
+
+  test("SPARK-15752 optimize metadata only query for hive table") {
+    withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
+      withTable("data_15752", "srcpart_15752", "srctext_15752") {
+        val df = Seq((1, "2"), (3, "4")).toDF("key", "value")
+        df.createOrReplaceTempView("data_15752")
+        sql(
+          """
+            |CREATE TABLE srcpart_15752 (col1 INT, col2 STRING)
+            |PARTITIONED BY (partcol1 INT, partcol2 STRING) STORED AS parquet
+          """.stripMargin)
+        for (partcol1 <- Seq(0, 1); partcol2 <- Seq("a", "b")) {
+          sql(
+            s"""
+              |INSERT OVERWRITE TABLE srcpart_15752
+              |PARTITION (partcol1='$partcol1', partcol2='$partcol2')
+              |select key, value from data_15752
+            """.stripMargin)
+        }
+        checkAnswer(
+          sql("select partcol1 from srcpart_15752 group by partcol1"),
+          Row(0) :: Row(1) :: Nil)
+        checkAnswer(
+          sql("select partcol1 from srcpart_15752 where partcol1 = 1 group by partcol1"),
+          Row(1))
+        checkAnswer(
+          sql("select partcol1, count(distinct partcol2) from srcpart_15752 group by partcol1"),
+          Row(0, 2) :: Row(1, 2) :: Nil)
+        checkAnswer(
+          sql("select partcol1, count(distinct partcol2) from srcpart_15752 where partcol1 = 1 " +
+            "group by partcol1"),
+          Row(1, 2) :: Nil)
+        checkAnswer(sql("select distinct partcol1 from srcpart_15752"), Row(0) :: Row(1) :: Nil)
+        checkAnswer(sql("select distinct partcol1 from srcpart_15752 where partcol1 = 1"), Row(1))
+        checkAnswer(
+          sql("select distinct col from (select partcol1 + 1 as col from srcpart_15752 " +
+            "where partcol1 = 1) t"),
+          Row(2))
+        checkAnswer(sql("select distinct partcol1 from srcpart_15752 where partcol1 = 1"), Row(1))
+        checkAnswer(sql("select max(partcol1) from srcpart_15752"), Row(1))
+        checkAnswer(sql("select max(partcol1) from srcpart_15752 where partcol1 = 1"), Row(1))
+        checkAnswer(sql("select max(partcol1) from (select partcol1 from srcpart_15752) t"), Row(1))
+        checkAnswer(
+          sql("select max(col) from (select partcol1 + 1 as col from srcpart_15752 " +
+            "where partcol1 = 1) t"),
+          Row(2))
+
+        sql(
+          """
+            |CREATE TABLE srctext_15752 (col1 INT, col2 STRING)
+            |PARTITIONED BY (partcol1 INT, partcol2 STRING) STORED AS textfile
+          """.stripMargin)
+        for (partcol1 <- Seq(0, 1); partcol2 <- Seq("a", "b")) {
+          sql(
+            s"""
+              |INSERT OVERWRITE TABLE srctext_15752
+              |PARTITION (partcol1='$partcol1', partcol2='$partcol2')
+              |select key, value from data_15752
+            """.stripMargin)
+        }
+        checkAnswer(
+          sql("select partcol1 from srctext_15752 group by partcol1"),
+          Row(0) :: Row(1) :: Nil)
+        checkAnswer(
+          sql("select partcol1 from srctext_15752 where partcol1 = 1 group by partcol1"),
+          Row(1))
+        checkAnswer(
+          sql("select partcol1, count(distinct partcol2) from srctext_15752 group by partcol1"),
+          Row(0, 2) :: Row(1, 2) :: Nil)
+        checkAnswer(
+          sql("select partcol1, count(distinct partcol2) from srctext_15752  where partcol1 = 1 " +
+            "group by partcol1"),
+          Row(1, 2) :: Nil)
+        checkAnswer(sql("select distinct partcol1 from srctext_15752"), Row(0) :: Row(1) :: Nil)
+        checkAnswer(sql("select distinct partcol1 from srctext_15752 where partcol1 = 1"), Row(1))
+        checkAnswer(
+          sql("select distinct col from (select partcol1 + 1 as col from srctext_15752 " +
+            "where partcol1 = 1) t"),
+          Row(2))
+        checkAnswer(sql("select max(partcol1) from srctext_15752"), Row(1))
+        checkAnswer(sql("select max(partcol1) from srctext_15752 where partcol1 = 1"), Row(1))
+        checkAnswer(sql("select max(partcol1) from (select partcol1 from srctext_15752) t"), Row(1))
+        checkAnswer(
+          sql("select max(col) from (select partcol1 + 1 as col from srctext_15752 " +
+            "where partcol1 = 1) t"),
+          Row(2))
+      }
+    }
+  }
 }