diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index dc11e536efc45930215816c3513e076e2769c3cc..547013c23fd78c1f21fcf6baf00f352a14bcdf93 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -472,7 +472,7 @@ identifierComment
     ;
 
 relationPrimary
-    : tableIdentifier sample? (AS? strictIdentifier)?      #tableName
+    : tableIdentifier sample? tableAlias                   #tableName
     | '(' queryNoWith ')' sample? (AS? strictIdentifier)   #aliasedQuery
     | '(' relation ')' sample? (AS? strictIdentifier)?     #aliasedRelation
     | inlineTable                                          #inlineTableDefault2
@@ -711,7 +711,7 @@ nonReserved
     | ADD
     | OVER | PARTITION | RANGE | ROWS | PRECEDING | FOLLOWING | CURRENT | ROW | LAST | FIRST | AFTER
     | MAP | ARRAY | STRUCT
-    | LATERAL | WINDOW | REDUCE | TRANSFORM | USING | SERDE | SERDEPROPERTIES | RECORDREADER
+    | LATERAL | WINDOW | REDUCE | TRANSFORM | SERDE | SERDEPROPERTIES | RECORDREADER
     | DELIMITED | FIELDS | TERMINATED | COLLECTION | ITEMS | KEYS | ESCAPED | LINES | SEPARATED
     | EXTENDED | REFRESH | CLEAR | CACHE | UNCACHE | LAZY | GLOBAL | TEMPORARY | OPTIONS
     | GROUPING | CUBE | ROLLUP
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 85cf8ddbaacf44c6b5124b7bd8efabee031a88d6..8818404094eb16aa9e9deae91fd31dcbea808f98 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -593,7 +593,25 @@ class Analyzer(
     def resolveRelation(plan: LogicalPlan): LogicalPlan = plan match {
       case u: UnresolvedRelation if !isRunningDirectlyOnFiles(u.tableIdentifier) =>
         val defaultDatabase = AnalysisContext.get.defaultDatabase
-        val relation = lookupTableFromCatalog(u, defaultDatabase)
+        val foundRelation = lookupTableFromCatalog(u, defaultDatabase)
+
+        // Add `Project` to rename output column names if a query has alias names:
+        // e.g., SELECT col1, col2 FROM testData AS t(col1, col2)
+        val relation = if (u.outputColumnNames.nonEmpty) {
+          val outputAttrs = foundRelation.output
+          // Checks if the number of the aliases equals to the number of columns in the table.
+          if (u.outputColumnNames.size != outputAttrs.size) {
+            u.failAnalysis(s"Number of column aliases does not match number of columns. " +
+              s"Table name: ${u.tableName}; number of column aliases: " +
+              s"${u.outputColumnNames.size}; number of columns: ${outputAttrs.size}.")
+          }
+          val aliases = outputAttrs.zip(u.outputColumnNames).map {
+            case (attr, name) => Alias(attr, name)()
+          }
+          Project(aliases, foundRelation)
+        } else {
+          foundRelation
+        }
         resolveRelation(relation)
       // The view's child should be a logical plan parsed from the `desc.viewText`, the variable
       // `viewText` should be defined, or else we throw an error on the generation of the View
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
index 40675359bec4730dce6509c3b5bcc0c3b8a40dc9..a214e59302cd96a938b22633221ea9faada9c93a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
@@ -131,8 +131,9 @@ object ResolveTableValuedFunctions extends Rule[LogicalPlan] {
         val outputAttrs = resolvedFunc.output
         // Checks if the number of the aliases is equal to expected one
         if (u.outputNames.size != outputAttrs.size) {
-          u.failAnalysis(s"expected ${outputAttrs.size} columns but " +
-            s"found ${u.outputNames.size} columns")
+          u.failAnalysis(s"Number of given aliases does not match number of output columns. " +
+            s"Function name: ${u.functionName}; number of aliases: " +
+            s"${u.outputNames.size}; number of output columns: ${outputAttrs.size}.")
         }
         val aliases = outputAttrs.zip(u.outputNames).map {
           case (attr, name) => Alias(attr, name)()
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 51bef6e20b9facb283880a26f677627e0f393c43..42b9641bef27671d1878038e7b14334236707170 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -36,8 +36,21 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
 
 /**
  * Holds the name of a relation that has yet to be looked up in a catalog.
+ * We could add alias names for columns in a relation:
+ * {{{
+ *   // Assign alias names
+ *   SELECT col1, col2 FROM testData AS t(col1, col2);
+ * }}}
+ *
+ * @param tableIdentifier table name
+ * @param outputColumnNames alias names of columns. If these names given, an analyzer adds
+ *                          [[Project]] to rename the columns.
  */
-case class UnresolvedRelation(tableIdentifier: TableIdentifier) extends LeafNode {
+case class UnresolvedRelation(
+    tableIdentifier: TableIdentifier,
+    outputColumnNames: Seq[String] = Seq.empty)
+  extends LeafNode {
+
   /** Returns a `.` separated name for this relation. */
   def tableName: String = tableIdentifier.unquotedString
 
@@ -71,6 +84,11 @@ case class UnresolvedInlineTable(
  *   // Assign alias names
  *   select t.a from range(10) t(a);
  * }}}
+ *
+ * @param functionName name of this table-value function
+ * @param functionArgs list of function arguments
+ * @param outputNames alias names of function output columns. If these names given, an analyzer
+ *                    adds [[Project]] to rename the output columns.
  */
 case class UnresolvedTableValuedFunction(
     functionName: String,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 7d2e3a6fe7580b9ab3238a83f823078496742965..5f34d0777d5a19eedbb8f5f103fe1ed75f00a916 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -676,12 +676,16 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
    * Create an aliased table reference. This is typically used in FROM clauses.
    */
   override def visitTableName(ctx: TableNameContext): LogicalPlan = withOrigin(ctx) {
-    val table = UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier))
-
-    val tableWithAlias = Option(ctx.strictIdentifier).map(_.getText) match {
-      case Some(strictIdentifier) =>
-        SubqueryAlias(strictIdentifier, table)
-      case _ => table
+    val tableId = visitTableIdentifier(ctx.tableIdentifier)
+    val table = if (ctx.tableAlias.identifierList != null) {
+      UnresolvedRelation(tableId, visitIdentifierList(ctx.tableAlias.identifierList))
+    } else {
+      UnresolvedRelation(tableId)
+    }
+    val tableWithAlias = if (ctx.tableAlias.strictIdentifier != null) {
+      SubqueryAlias(ctx.tableAlias.strictIdentifier.getText, table)
+    } else {
+      table
     }
     tableWithAlias.optionalMap(ctx.sample)(withSample)
   }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 7eccca2e85649cccac1c38a6223a4384b30bcf1e..5393786891e075727f86a9e388b957e5638ec554 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -465,6 +465,23 @@ class AnalysisSuite extends AnalysisTest with ShouldMatchers {
     assertAnalysisSuccess(rangeWithAliases(2 :: 6 :: 2 :: Nil, "c" :: Nil))
     assertAnalysisError(
       rangeWithAliases(3 :: Nil, "a" :: "b" :: Nil),
-      Seq("expected 1 columns but found 2 columns"))
+      Seq("Number of given aliases does not match number of output columns. "
+        + "Function name: range; number of aliases: 2; number of output columns: 1."))
+  }
+
+  test("SPARK-20841 Support table column aliases in FROM clause") {
+    def tableColumnsWithAliases(outputNames: Seq[String]): LogicalPlan = {
+      SubqueryAlias("t", UnresolvedRelation(TableIdentifier("TaBlE3"), outputNames))
+        .select(star())
+    }
+    assertAnalysisSuccess(tableColumnsWithAliases("col1" :: "col2" :: "col3" :: "col4" :: Nil))
+    assertAnalysisError(
+      tableColumnsWithAliases("col1" :: Nil),
+      Seq("Number of column aliases does not match number of columns. Table name: TaBlE3; " +
+        "number of column aliases: 1; number of columns: 4."))
+    assertAnalysisError(
+      tableColumnsWithAliases("col1" :: "col2" :: "col3" :: "col4" :: "col5" :: Nil),
+      Seq("Number of column aliases does not match number of columns. Table name: TaBlE3; " +
+        "number of column aliases: 5; number of columns: 4."))
   }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
index 82015b1e0671c666458c416d7dbcc0bb039d3fbc..afc7ce4195a8b11ab9a4a508367c1308e8ad28fc 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
@@ -35,6 +35,7 @@ trait AnalysisTest extends PlanTest {
     val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf)
     catalog.createTempView("TaBlE", TestRelations.testRelation, overrideIfExists = true)
     catalog.createTempView("TaBlE2", TestRelations.testRelation2, overrideIfExists = true)
+    catalog.createTempView("TaBlE3", TestRelations.testRelation3, overrideIfExists = true)
     new Analyzer(catalog, conf) {
       override val extendedResolutionRules = EliminateSubqueryAliases :: Nil
     }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 134e7614608819eff1f53a5d3880a14029f670d1..7a5357eef8f94622a5af2a277743ef4c27868e96 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -17,8 +17,8 @@
 
 package org.apache.spark.sql.catalyst.parser
 
-import org.apache.spark.sql.catalyst.FunctionIdentifier
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedGenerator, UnresolvedInlineTable, UnresolvedTableValuedFunction}
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedGenerator, UnresolvedInlineTable, UnresolvedRelation, UnresolvedTableValuedFunction}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -493,6 +493,13 @@ class PlanParserSuite extends PlanTest {
         .select(star()))
   }
 
+  test("SPARK-20841 Support table column aliases in FROM clause") {
+    assertEqual(
+      "SELECT * FROM testData AS t(col1, col2)",
+      SubqueryAlias("t", UnresolvedRelation(TableIdentifier("testData"), Seq("col1", "col2")))
+        .select(star()))
+  }
+
   test("inline table") {
     assertEqual("values 1, 2, 3, 4",
       UnresolvedInlineTable(Seq("col1"), Seq(1, 2, 3, 4).map(x => Seq(Literal(x)))))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/TableIdentifierParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/TableIdentifierParserSuite.scala
index 170c469197e73319ef686ec1af1d52b33a3f291d..f33abc5b2e049a7ffaec9f390a85f9c03ab7d980 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/TableIdentifierParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/TableIdentifierParserSuite.scala
@@ -49,7 +49,7 @@ class TableIdentifierParserSuite extends SparkFunSuite {
     "insert", "int", "into", "is", "lateral", "like", "local", "none", "null",
     "of", "order", "out", "outer", "partition", "percent", "procedure", "range", "reads", "revoke",
     "rollup", "row", "rows", "set", "smallint", "table", "timestamp", "to", "trigger",
-    "true", "truncate", "update", "user", "using", "values", "with", "regexp", "rlike",
+    "true", "truncate", "update", "user", "values", "with", "regexp", "rlike",
     "bigint", "binary", "boolean", "current_date", "current_timestamp", "date", "double", "float",
     "int", "smallint", "timestamp", "at")
 
diff --git a/sql/core/src/test/resources/sql-tests/inputs/table-aliases.sql b/sql/core/src/test/resources/sql-tests/inputs/table-aliases.sql
new file mode 100644
index 0000000000000000000000000000000000000000..c90a9c7f85587f1abf831ef9ed72ea55c9bb0406
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/table-aliases.sql
@@ -0,0 +1,17 @@
+-- Test data.
+CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES (1, 1), (1, 2), (2, 1) AS testData(a, b);
+
+-- Table column aliases in FROM clause
+SELECT * FROM testData AS t(col1, col2) WHERE col1 = 1;
+
+SELECT * FROM testData AS t(col1, col2) WHERE col1 = 2;
+
+SELECT col1 AS k, SUM(col2) FROM testData AS t(col1, col2) GROUP BY k;
+
+-- Aliasing the wrong number of columns in the FROM clause
+SELECT * FROM testData AS t(col1, col2, col3);
+
+SELECT * FROM testData AS t(col1);
+
+-- Check alias duplication
+SELECT a AS col1, b AS col2 FROM testData AS t(c, d);
diff --git a/sql/core/src/test/resources/sql-tests/results/table-aliases.sql.out b/sql/core/src/test/resources/sql-tests/results/table-aliases.sql.out
new file mode 100644
index 0000000000000000000000000000000000000000..c318018dced293e0c196f8c2d27f80d0837cc915
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/table-aliases.sql.out
@@ -0,0 +1,63 @@
+-- Automatically generated by SQLQueryTestSuite
+-- Number of queries: 7
+
+
+-- !query 0
+CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES (1, 1), (1, 2), (2, 1) AS testData(a, b)
+-- !query 0 schema
+struct<>
+-- !query 0 output
+
+
+
+-- !query 1
+SELECT * FROM testData AS t(col1, col2) WHERE col1 = 1
+-- !query 1 schema
+struct<col1:int,col2:int>
+-- !query 1 output
+1	1
+1	2
+
+
+-- !query 2
+SELECT * FROM testData AS t(col1, col2) WHERE col1 = 2
+-- !query 2 schema
+struct<col1:int,col2:int>
+-- !query 2 output
+2	1
+
+
+-- !query 3
+SELECT col1 AS k, SUM(col2) FROM testData AS t(col1, col2) GROUP BY k
+-- !query 3 schema
+struct<k:int,sum(col2):bigint>
+-- !query 3 output
+1	3
+2	1
+
+
+-- !query 4
+SELECT * FROM testData AS t(col1, col2, col3)
+-- !query 4 schema
+struct<>
+-- !query 4 output
+org.apache.spark.sql.AnalysisException
+Number of column aliases does not match number of columns. Table name: testData; number of column aliases: 3; number of columns: 2.; line 1 pos 14
+
+
+-- !query 5
+SELECT * FROM testData AS t(col1)
+-- !query 5 schema
+struct<>
+-- !query 5 output
+org.apache.spark.sql.AnalysisException
+Number of column aliases does not match number of columns. Table name: testData; number of column aliases: 1; number of columns: 2.; line 1 pos 14
+
+
+-- !query 6
+SELECT a AS col1, b AS col2 FROM testData AS t(c, d)
+-- !query 6 schema
+struct<>
+-- !query 6 output
+org.apache.spark.sql.AnalysisException
+cannot resolve '`a`' given input columns: [c, d]; line 1 pos 7
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
index a6249ce0214007e48792e322512f8e5409a22175..6a5b74b01df80f08be384b7e2ef9503a96a1f000 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
@@ -74,13 +74,13 @@ object TPCDSQueryBenchmark {
       // per-row processing time for those cases.
       val queryRelations = scala.collection.mutable.HashSet[String]()
       spark.sql(queryString).queryExecution.logical.map {
-        case ur @ UnresolvedRelation(t: TableIdentifier) =>
+        case UnresolvedRelation(t: TableIdentifier, _) =>
           queryRelations.add(t.table)
         case lp: LogicalPlan =>
           lp.expressions.foreach { _ foreach {
             case subquery: SubqueryExpression =>
               subquery.plan.foreach {
-                case ur @ UnresolvedRelation(t: TableIdentifier) =>
+                case UnresolvedRelation(t: TableIdentifier, _) =>
                   queryRelations.add(t.table)
                 case _ =>
               }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index ee9ac21a738dc1baed859c6f90368498e417ef05..e1534c797d55bcc1e59112922081c3c5d76c4d9b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -544,7 +544,7 @@ private[hive] class TestHiveQueryExecution(
     // Make sure any test tables referenced are loaded.
     val referencedTables =
       describedTables ++
-        logical.collect { case UnresolvedRelation(tableIdent) => tableIdent.table }
+        logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.table }
     val referencedTestTables = referencedTables.filter(sparkSession.testTables.contains)
     logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}")
     referencedTestTables.foreach(sparkSession.loadTestTable)