diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index cc3b8fd3b46894bb7c0b06c027c5264efdcb3f9a..c4a590ec6916b1d6301ff11713c78b0d4966b5e2 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -85,6 +85,8 @@ statement
         LIKE source=tableIdentifier locationSpec?                      #createTableLike
     | ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS
         (identifier | FOR COLUMNS identifierSeq)?                      #analyze
+    | ALTER TABLE tableIdentifier
+        ADD COLUMNS '(' columns=colTypeList ')'                        #addTableColumns
     | ALTER (TABLE | VIEW) from=tableIdentifier
         RENAME TO to=tableIdentifier                                   #renameTable
     | ALTER (TABLE | VIEW) tableIdentifier
@@ -198,7 +200,6 @@ unsupportedHiveNativeCommands
     | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=COMPACT
     | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=CONCATENATE
     | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=SET kw4=FILEFORMAT
-    | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=ADD kw4=COLUMNS
     | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=REPLACE kw4=COLUMNS
     | kw1=START kw2=TRANSACTION
     | kw1=COMMIT
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index b134fd44a311f7ba97452354f818f681c7a93184..a469d1245164336601f1d9b51925353c20bcf878 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View}
 import org.apache.spark.sql.catalyst.util.StringUtils
+import org.apache.spark.sql.types.{StructField, StructType}
 
 object SessionCatalog {
   val DEFAULT_DATABASE = "default"
@@ -161,6 +162,20 @@ class SessionCatalog(
       throw new TableAlreadyExistsException(db = db, table = name.table)
     }
   }
+
+  private def checkDuplication(fields: Seq[StructField]): Unit = {
+    val columnNames = if (conf.caseSensitiveAnalysis) {
+      fields.map(_.name)
+    } else {
+      fields.map(_.name.toLowerCase)
+    }
+    if (columnNames.distinct.length != columnNames.length) {
+      val duplicateColumns = columnNames.groupBy(identity).collect {
+        case (x, ys) if ys.length > 1 => x
+      }
+      throw new AnalysisException(s"Found duplicate column(s): ${duplicateColumns.mkString(", ")}")
+    }
+  }
   // ----------------------------------------------------------------------------
   // Databases
   // ----------------------------------------------------------------------------
@@ -295,6 +310,47 @@ class SessionCatalog(
     externalCatalog.alterTable(newTableDefinition)
   }
 
+  /**
+   * Alter the schema of a table identified by the provided table identifier. The new schema
+   * should still contain the existing bucket columns and partition columns used by the table. This
+   * method will also update any Spark SQL-related parameters stored as Hive table properties (such
+   * as the schema itself).
+   *
+   * @param identifier TableIdentifier
+   * @param newSchema Updated schema to be used for the table (must contain existing partition and
+   *                  bucket columns, and partition columns need to be at the end)
+   */
+  def alterTableSchema(
+      identifier: TableIdentifier,
+      newSchema: StructType): Unit = {
+    val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase))
+    val table = formatTableName(identifier.table)
+    val tableIdentifier = TableIdentifier(table, Some(db))
+    requireDbExists(db)
+    requireTableExists(tableIdentifier)
+    checkDuplication(newSchema)
+
+    val catalogTable = externalCatalog.getTable(db, table)
+    val oldSchema = catalogTable.schema
+
+    // not supporting dropping columns yet
+    val nonExistentColumnNames = oldSchema.map(_.name).filterNot(columnNameResolved(newSchema, _))
+    if (nonExistentColumnNames.nonEmpty) {
+      throw new AnalysisException(
+        s"""
+           |Some existing schema fields (${nonExistentColumnNames.mkString("[", ",", "]")}) are
+           |not present in the new schema. We don't support dropping columns yet.
+         """.stripMargin)
+    }
+
+    // assuming the newSchema has all partition columns at the end as required
+    externalCatalog.alterTableSchema(db, table, newSchema)
+  }
+
+  private def columnNameResolved(schema: StructType, colName: String): Boolean = {
+    schema.fields.map(_.name).exists(conf.resolver(_, colName))
+  }
+
   /**
    * Return whether a table/view with the specified name exists. If no database is specified, check
    * with current database.
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index fd9e5d6bb13edfc9d53b62bc1b4ecc4ed6ea043e..ca4ce1c11707a39c5d6acb1f595ae6b24ef7cbba 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View}
+import org.apache.spark.sql.types._
 
 class InMemorySessionCatalogSuite extends SessionCatalogSuite {
   protected val utils = new CatalogTestUtils {
@@ -448,6 +449,34 @@ abstract class SessionCatalogSuite extends PlanTest {
     }
   }
 
+  test("alter table add columns") {
+    withBasicCatalog { sessionCatalog =>
+      sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
+      val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
+      sessionCatalog.alterTableSchema(
+        TableIdentifier("t1", Some("default")),
+        StructType(oldTab.dataSchema.add("c3", IntegerType) ++ oldTab.partitionSchema))
+
+      val newTab = sessionCatalog.externalCatalog.getTable("default", "t1")
+      // construct the expected table schema
+      val expectedTableSchema = StructType(oldTab.dataSchema.fields ++
+        Seq(StructField("c3", IntegerType)) ++ oldTab.partitionSchema)
+      assert(newTab.schema == expectedTableSchema)
+    }
+  }
+
+  test("alter table drop columns") {
+    withBasicCatalog { sessionCatalog =>
+      sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
+      val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
+      val e = intercept[AnalysisException] {
+        sessionCatalog.alterTableSchema(
+          TableIdentifier("t1", Some("default")), StructType(oldTab.schema.drop(1)))
+      }.getMessage
+      assert(e.contains("We don't support dropping columns yet."))
+    }
+  }
+
   test("get table") {
     withBasicCatalog { catalog =>
       assert(catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index abea7a3bcf1463882959d6700b8c9a316ecbc780..d4f23f9dd5185ee5cf3df858866acf3c5f47f781 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -741,6 +741,22 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       ctx.VIEW != null)
   }
 
+  /**
+   * Create a [[AlterTableAddColumnsCommand]] command.
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table1
+   *   ADD COLUMNS (col_name data_type [COMMENT col_comment], ...);
+   * }}}
+   */
+  override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = withOrigin(ctx) {
+    AlterTableAddColumnsCommand(
+      visitTableIdentifier(ctx.tableIdentifier),
+      visitColTypeList(ctx.columns)
+    )
+  }
+
   /**
    * Create an [[AlterTableSetPropertiesCommand]] command.
    *
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index beb3dcafd64f9f3036ca9669379a443f0448fc2e..93307fc8835654fc4c674f1158010a91bf2c40c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -37,7 +37,10 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, PartitioningUtils}
+import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
+import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -174,6 +177,77 @@ case class AlterTableRenameCommand(
 
 }
 
+/**
+ * A command that add columns to a table
+ * The syntax of using this command in SQL is:
+ * {{{
+ *   ALTER TABLE table_identifier
+ *   ADD COLUMNS (col_name data_type [COMMENT col_comment], ...);
+ * }}}
+*/
+case class AlterTableAddColumnsCommand(
+    table: TableIdentifier,
+    columns: Seq[StructField]) extends RunnableCommand {
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
+    val catalogTable = verifyAlterTableAddColumn(catalog, table)
+
+    try {
+      sparkSession.catalog.uncacheTable(table.quotedString)
+    } catch {
+      case NonFatal(e) =>
+        log.warn(s"Exception when attempting to uncache table ${table.quotedString}", e)
+    }
+    catalog.refreshTable(table)
+
+    // make sure any partition columns are at the end of the fields
+    val reorderedSchema = catalogTable.dataSchema ++ columns ++ catalogTable.partitionSchema
+    catalog.alterTableSchema(
+      table, catalogTable.schema.copy(fields = reorderedSchema.toArray))
+
+    Seq.empty[Row]
+  }
+
+  /**
+   * ALTER TABLE ADD COLUMNS command does not support temporary view/table,
+   * view, or datasource table with text, orc formats or external provider.
+   * For datasource table, it currently only supports parquet, json, csv.
+   */
+  private def verifyAlterTableAddColumn(
+      catalog: SessionCatalog,
+      table: TableIdentifier): CatalogTable = {
+    val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table)
+
+    if (catalogTable.tableType == CatalogTableType.VIEW) {
+      throw new AnalysisException(
+        s"""
+          |ALTER ADD COLUMNS does not support views.
+          |You must drop and re-create the views for adding the new columns. Views: $table
+         """.stripMargin)
+    }
+
+    if (DDLUtils.isDatasourceTable(catalogTable)) {
+      DataSource.lookupDataSource(catalogTable.provider.get).newInstance() match {
+        // For datasource table, this command can only support the following File format.
+        // TextFileFormat only default to one column "value"
+        // OrcFileFormat can not handle difference between user-specified schema and
+        // inferred schema yet. TODO, once this issue is resolved , we can add Orc back.
+        // Hive type is already considered as hive serde table, so the logic will not
+        // come in here.
+        case _: JsonFileFormat | _: CSVFileFormat | _: ParquetFileFormat =>
+        case s =>
+          throw new AnalysisException(
+            s"""
+              |ALTER ADD COLUMNS does not support datasource table with type $s.
+              |You must drop and re-create the table for adding the new columns. Tables: $table
+             """.stripMargin)
+      }
+    }
+    catalogTable
+  }
+}
+
+
 /**
  * A command that loads data into a Hive table.
  *
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 4b73b078da38ef9526a86abbde08d23616c17f96..13202a57851e1c5b9511bc1eb658e205cacc2800 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -780,13 +780,7 @@ class DDLCommandSuite extends PlanTest {
     assertUnsupported("ALTER TABLE table_name SKEWED BY (key) ON (1,5,6) STORED AS DIRECTORIES")
   }
 
-  test("alter table: add/replace columns (not allowed)") {
-    assertUnsupported(
-      """
-       |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us')
-       |ADD COLUMNS (new_col1 INT COMMENT 'test_comment', new_col2 LONG
-       |COMMENT 'test_comment2') CASCADE
-      """.stripMargin)
+  test("alter table: replace columns (not allowed)") {
     assertUnsupported(
       """
        |ALTER TABLE table_name REPLACE COLUMNS (new_col1 INT
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 235c6bf6ad5923abd3bdff508dc9975112046172..648b1798c66e07094a1492e5b221917ffeb2a40e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -2185,4 +2185,126 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
       }
     }
   }
+
+  val supportedNativeFileFormatsForAlterTableAddColumns = Seq("parquet", "json", "csv")
+
+  supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider =>
+    test(s"alter datasource table add columns - $provider") {
+      withTable("t1") {
+        sql(s"CREATE TABLE t1 (c1 int) USING $provider")
+        sql("INSERT INTO t1 VALUES (1)")
+        sql("ALTER TABLE t1 ADD COLUMNS (c2 int)")
+        checkAnswer(
+          spark.table("t1"),
+          Seq(Row(1, null))
+        )
+        checkAnswer(
+          sql("SELECT * FROM t1 WHERE c2 is null"),
+          Seq(Row(1, null))
+        )
+
+        sql("INSERT INTO t1 VALUES (3, 2)")
+        checkAnswer(
+          sql("SELECT * FROM t1 WHERE c2 = 2"),
+          Seq(Row(3, 2))
+        )
+      }
+    }
+  }
+
+  supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider =>
+    test(s"alter datasource table add columns - partitioned - $provider") {
+      withTable("t1") {
+        sql(s"CREATE TABLE t1 (c1 int, c2 int) USING $provider PARTITIONED BY (c2)")
+        sql("INSERT INTO t1 PARTITION(c2 = 2) VALUES (1)")
+        sql("ALTER TABLE t1 ADD COLUMNS (c3 int)")
+        checkAnswer(
+          spark.table("t1"),
+          Seq(Row(1, null, 2))
+        )
+        checkAnswer(
+          sql("SELECT * FROM t1 WHERE c3 is null"),
+          Seq(Row(1, null, 2))
+        )
+        sql("INSERT INTO t1 PARTITION(c2 =1) VALUES (2, 3)")
+        checkAnswer(
+          sql("SELECT * FROM t1 WHERE c3 = 3"),
+          Seq(Row(2, 3, 1))
+        )
+        checkAnswer(
+          sql("SELECT * FROM t1 WHERE c2 = 1"),
+          Seq(Row(2, 3, 1))
+        )
+      }
+    }
+  }
+
+  test("alter datasource table add columns - text format not supported") {
+    withTable("t1") {
+      sql("CREATE TABLE t1 (c1 int) USING text")
+      val e = intercept[AnalysisException] {
+        sql("ALTER TABLE t1 ADD COLUMNS (c2 int)")
+      }.getMessage
+      assert(e.contains("ALTER ADD COLUMNS does not support datasource table with type"))
+    }
+  }
+
+  test("alter table add columns -- not support temp view") {
+    withTempView("tmp_v") {
+      sql("CREATE TEMPORARY VIEW tmp_v AS SELECT 1 AS c1, 2 AS c2")
+      val e = intercept[AnalysisException] {
+        sql("ALTER TABLE tmp_v ADD COLUMNS (c3 INT)")
+      }
+      assert(e.message.contains("ALTER ADD COLUMNS does not support views"))
+    }
+  }
+
+  test("alter table add columns -- not support view") {
+    withView("v1") {
+      sql("CREATE VIEW v1 AS SELECT 1 AS c1, 2 AS c2")
+      val e = intercept[AnalysisException] {
+        sql("ALTER TABLE v1 ADD COLUMNS (c3 INT)")
+      }
+      assert(e.message.contains("ALTER ADD COLUMNS does not support views"))
+    }
+  }
+
+  test("alter table add columns with existing column name") {
+    withTable("t1") {
+      sql("CREATE TABLE t1 (c1 int) USING PARQUET")
+      val e = intercept[AnalysisException] {
+        sql("ALTER TABLE t1 ADD COLUMNS (c1 string)")
+      }.getMessage
+      assert(e.contains("Found duplicate column(s)"))
+    }
+  }
+
+  Seq(true, false).foreach { caseSensitive =>
+    test(s"alter table add columns with existing column name - caseSensitive $caseSensitive") {
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") {
+        withTable("t1") {
+          sql("CREATE TABLE t1 (c1 int) USING PARQUET")
+          if (!caseSensitive) {
+            val e = intercept[AnalysisException] {
+              sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
+            }.getMessage
+            assert(e.contains("Found duplicate column(s)"))
+          } else {
+            if (isUsingHiveMetastore) {
+              // hive catalog will still complains that c1 is duplicate column name because hive
+              // identifiers are case insensitive.
+              val e = intercept[AnalysisException] {
+                sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
+              }.getMessage
+              assert(e.contains("HiveException"))
+            } else {
+              sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
+              assert(spark.table("t1").schema
+                .equals(new StructType().add("c1", IntegerType).add("C1", StringType)))
+            }
+          }
+        }
+      }
+    }
+  }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index d752c415c1ed834e375af682a9ef1fbf8851c8f6..04bc79d43032496f362939a0208795bee19218cd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
 import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.types.{MetadataBuilder, StructType}
+import org.apache.spark.sql.types._
 
 // TODO(gatorsmile): combine HiveCatalogedDDLSuite and HiveDDLSuite
 class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeAndAfterEach {
@@ -112,6 +112,7 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
 class HiveDDLSuite
   extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
   import testImplicits._
+  val hiveFormats = Seq("PARQUET", "ORC", "TEXTFILE", "SEQUENCEFILE", "RCFILE", "AVRO")
 
   override def afterEach(): Unit = {
     try {
@@ -1860,4 +1861,101 @@ class HiveDDLSuite
       }
     }
   }
+
+  hiveFormats.foreach { tableType =>
+    test(s"alter hive serde table add columns -- partitioned - $tableType") {
+      withTable("tab") {
+        sql(
+          s"""
+             |CREATE TABLE tab (c1 int, c2 int)
+             |PARTITIONED BY (c3 int) STORED AS $tableType
+          """.stripMargin)
+
+        sql("INSERT INTO tab PARTITION (c3=1) VALUES (1, 2)")
+        sql("ALTER TABLE tab ADD COLUMNS (c4 int)")
+
+        checkAnswer(
+          sql("SELECT * FROM tab WHERE c3 = 1"),
+          Seq(Row(1, 2, null, 1))
+        )
+        assert(spark.table("tab").schema
+          .contains(StructField("c4", IntegerType)))
+        sql("INSERT INTO tab PARTITION (c3=2) VALUES (2, 3, 4)")
+        checkAnswer(
+          spark.table("tab"),
+          Seq(Row(1, 2, null, 1), Row(2, 3, 4, 2))
+        )
+        checkAnswer(
+          sql("SELECT * FROM tab WHERE c3 = 2 AND c4 IS NOT NULL"),
+          Seq(Row(2, 3, 4, 2))
+        )
+
+        sql("ALTER TABLE tab ADD COLUMNS (c5 char(10))")
+        assert(spark.table("tab").schema.find(_.name == "c5")
+          .get.metadata.getString("HIVE_TYPE_STRING") == "char(10)")
+      }
+    }
+  }
+
+  hiveFormats.foreach { tableType =>
+    test(s"alter hive serde table add columns -- with predicate - $tableType ") {
+      withTable("tab") {
+        sql(s"CREATE TABLE tab (c1 int, c2 int) STORED AS $tableType")
+        sql("INSERT INTO tab VALUES (1, 2)")
+        sql("ALTER TABLE tab ADD COLUMNS (c4 int)")
+        checkAnswer(
+          sql("SELECT * FROM tab WHERE c4 IS NULL"),
+          Seq(Row(1, 2, null))
+        )
+        assert(spark.table("tab").schema
+          .contains(StructField("c4", IntegerType)))
+        sql("INSERT INTO tab VALUES (2, 3, 4)")
+        checkAnswer(
+          sql("SELECT * FROM tab WHERE c4 = 4 "),
+          Seq(Row(2, 3, 4))
+        )
+        checkAnswer(
+          spark.table("tab"),
+          Seq(Row(1, 2, null), Row(2, 3, 4))
+        )
+      }
+    }
+  }
+
+  Seq(true, false).foreach { caseSensitive =>
+    test(s"alter add columns with existing column name - caseSensitive $caseSensitive") {
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") {
+        withTable("tab") {
+          sql("CREATE TABLE tab (c1 int) PARTITIONED BY (c2 int) STORED AS PARQUET")
+          if (!caseSensitive) {
+            // duplicating partitioning column name
+            val e1 = intercept[AnalysisException] {
+              sql("ALTER TABLE tab ADD COLUMNS (C2 string)")
+            }.getMessage
+            assert(e1.contains("Found duplicate column(s)"))
+
+            // duplicating data column name
+            val e2 = intercept[AnalysisException] {
+              sql("ALTER TABLE tab ADD COLUMNS (C1 string)")
+            }.getMessage
+            assert(e2.contains("Found duplicate column(s)"))
+          } else {
+            // hive catalog will still complains that c1 is duplicate column name because hive
+            // identifiers are case insensitive.
+            val e1 = intercept[AnalysisException] {
+              sql("ALTER TABLE tab ADD COLUMNS (C2 string)")
+            }.getMessage
+            assert(e1.contains("HiveException"))
+
+            // hive catalog will still complains that c1 is duplicate column name because hive
+            // identifiers are case insensitive.
+            val e2 = intercept[AnalysisException] {
+              sql("ALTER TABLE tab ADD COLUMNS (C1 string)")
+            }.getMessage
+            assert(e2.contains("HiveException"))
+          }
+        }
+      }
+    }
+  }
 }