diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 9332f773430e7b8a2b037390f550c23c72c5e171..ad6fc20df1f023448fcac910d600e8ddff8ef347 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -2357,18 +2357,9 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
             }.getMessage
             assert(e.contains("Found duplicate column(s)"))
           } else {
-            if (isUsingHiveMetastore) {
-              // hive catalog will still complains that c1 is duplicate column name because hive
-              // identifiers are case insensitive.
-              val e = intercept[AnalysisException] {
-                sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
-              }.getMessage
-              assert(e.contains("HiveException"))
-            } else {
-              sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
-              assert(spark.table("t1").schema
-                .equals(new StructType().add("c1", IntegerType).add("C1", StringType)))
-            }
+            sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
+            assert(spark.table("t1").schema ==
+              new StructType().add("c1", IntegerType).add("C1", StringType))
           }
         }
       }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 547447b31f0a1dbb1ce8e16dbf88991a7ea45338..bdbb8bccbc5cdd0898111372c69cf38901f20126 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -114,7 +114,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
    * should interpret these special data source properties and restore the original table metadata
    * before returning it.
    */
-  private def getRawTable(db: String, table: String): CatalogTable = withClient {
+  private[hive] def getRawTable(db: String, table: String): CatalogTable = withClient {
     client.getTable(db, table)
   }
 
@@ -386,6 +386,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
    * can be used as table properties later.
    */
   private def tableMetaToTableProps(table: CatalogTable): mutable.Map[String, String] = {
+    tableMetaToTableProps(table, table.schema)
+  }
+
+  private def tableMetaToTableProps(
+      table: CatalogTable,
+      schema: StructType): mutable.Map[String, String] = {
     val partitionColumns = table.partitionColumnNames
     val bucketSpec = table.bucketSpec
 
@@ -397,7 +403,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     // property. In this case, we split the JSON string and store each part as a separate table
     // property.
     val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)
-    val schemaJsonString = table.schema.json
+    val schemaJsonString = schema.json
     // Split the JSON string.
     val parts = schemaJsonString.grouped(threshold).toSeq
     properties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
@@ -615,20 +621,29 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   override def alterTableSchema(db: String, table: String, schema: StructType): Unit = withClient {
     requireTableExists(db, table)
     val rawTable = getRawTable(db, table)
-    val withNewSchema = rawTable.copy(schema = schema)
-    verifyColumnNames(withNewSchema)
     // Add table metadata such as table schema, partition columns, etc. to table properties.
-    val updatedTable = withNewSchema.copy(
-      properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema))
-    try {
-      client.alterTable(updatedTable)
-    } catch {
-      case NonFatal(e) =>
-        val warningMessage =
-          s"Could not alter schema of table  ${rawTable.identifier.quotedString} in a Hive " +
-            "compatible way. Updating Hive metastore in Spark SQL specific format."
-        logWarning(warningMessage, e)
-        client.alterTable(updatedTable.copy(schema = updatedTable.partitionSchema))
+    val updatedProperties = rawTable.properties ++ tableMetaToTableProps(rawTable, schema)
+    val withNewSchema = rawTable.copy(properties = updatedProperties, schema = schema)
+    verifyColumnNames(withNewSchema)
+
+    if (isDatasourceTable(rawTable)) {
+      // For data source tables, first try to write it with the schema set; if that does not work,
+      // try again with updated properties and the partition schema. This is a simplified version of
+      // what createDataSourceTable() does, and may leave the table in a state unreadable by Hive
+      // (for example, the schema does not match the data source schema, or does not match the
+      // storage descriptor).
+      try {
+        client.alterTable(withNewSchema)
+      } catch {
+        case NonFatal(e) =>
+          val warningMessage =
+            s"Could not alter schema of table  ${rawTable.identifier.quotedString} in a Hive " +
+              "compatible way. Updating Hive metastore in Spark SQL specific format."
+          logWarning(warningMessage, e)
+          client.alterTable(withNewSchema.copy(schema = rawTable.partitionSchema))
+      }
+    } else {
+      client.alterTable(withNewSchema)
     }
   }
 
@@ -1351,4 +1366,14 @@ object HiveExternalCatalog {
         getColumnNamesByType(metadata.properties, "sort", "sorting columns"))
     }
   }
+
+  /**
+   * Detects a data source table. This checks both the table provider and the table properties,
+   * unlike DDLUtils which just checks the former.
+   */
+  private[spark] def isDatasourceTable(table: CatalogTable): Boolean = {
+    val provider = table.provider.orElse(table.properties.get(DATASOURCE_PROVIDER))
+    provider.isDefined && provider != Some(DDLUtils.HIVE_PROVIDER)
+  }
+
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 995280e0e9416a2d6e2e6c30ff73f10fe8436956..7c0b9bf19bf30b8de97cd6a52208424f2b650d43 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -50,6 +50,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
 import org.apache.spark.sql.execution.QueryExecutionException
 import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hive.HiveExternalCatalog
 import org.apache.spark.sql.hive.client.HiveClientImpl._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.{CircularBuffer, Utils}
@@ -883,7 +884,7 @@ private[hive] object HiveClientImpl {
     }
     // after SPARK-19279, it is not allowed to create a hive table with an empty schema,
     // so here we should not add a default col schema
-    if (schema.isEmpty && DDLUtils.isDatasourceTable(table)) {
+    if (schema.isEmpty && HiveExternalCatalog.isDatasourceTable(table)) {
       // This is a hack to preserve existing behavior. Before Spark 2.0, we do not
       // set a default serde here (this was done in Hive), and so if the user provides
       // an empty schema Hive would automatically populate the schema with a single
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..5c248b9acd04fd9b9d89ee9e6b6ec87145af7ec6
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.language.existentials
+
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.StaticSQLConf._
+import org.apache.spark.sql.types._
+import org.apache.spark.tags.ExtendedHiveTest
+import org.apache.spark.util.Utils
+
+/**
+ * A separate set of DDL tests that uses Hive 2.1 libraries, which behave a little differently
+ * from the built-in ones.
+ */
+@ExtendedHiveTest
+class Hive_2_1_DDLSuite extends SparkFunSuite with TestHiveSingleton with BeforeAndAfterEach
+  with BeforeAndAfterAll {
+
+  // Create a custom HiveExternalCatalog instance with the desired configuration. We cannot
+  // use SparkSession here since there's already an active on managed by the TestHive object.
+  private var catalog = {
+    val warehouse = Utils.createTempDir()
+    val metastore = Utils.createTempDir()
+    metastore.delete()
+    val sparkConf = new SparkConf()
+      .set(SparkLauncher.SPARK_MASTER, "local")
+      .set(WAREHOUSE_PATH.key, warehouse.toURI().toString())
+      .set(CATALOG_IMPLEMENTATION.key, "hive")
+      .set(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1")
+      .set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")
+
+    val hadoopConf = new Configuration()
+    hadoopConf.set("hive.metastore.warehouse.dir", warehouse.toURI().toString())
+    hadoopConf.set("javax.jdo.option.ConnectionURL",
+      s"jdbc:derby:;databaseName=${metastore.getAbsolutePath()};create=true")
+    // These options are needed since the defaults in Hive 2.1 cause exceptions with an
+    // empty metastore db.
+    hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
+    hadoopConf.set("hive.metastore.schema.verification", "false")
+
+    new HiveExternalCatalog(sparkConf, hadoopConf)
+  }
+
+  override def afterEach: Unit = {
+    catalog.listTables("default").foreach { t =>
+      catalog.dropTable("default", t, true, false)
+    }
+    spark.sessionState.catalog.reset()
+  }
+
+  override def afterAll(): Unit = {
+    catalog = null
+  }
+
+  test("SPARK-21617: ALTER TABLE for non-compatible DataSource tables") {
+    testAlterTable(
+      "t1",
+      "CREATE TABLE t1 (c1 int) USING json",
+      StructType(Array(StructField("c1", IntegerType), StructField("c2", IntegerType))),
+      hiveCompatible = false)
+  }
+
+  test("SPARK-21617: ALTER TABLE for Hive-compatible DataSource tables") {
+    testAlterTable(
+      "t1",
+      "CREATE TABLE t1 (c1 int) USING parquet",
+      StructType(Array(StructField("c1", IntegerType), StructField("c2", IntegerType))))
+  }
+
+  test("SPARK-21617: ALTER TABLE for Hive tables") {
+    testAlterTable(
+      "t1",
+      "CREATE TABLE t1 (c1 int) STORED AS parquet",
+      StructType(Array(StructField("c1", IntegerType), StructField("c2", IntegerType))))
+  }
+
+  test("SPARK-21617: ALTER TABLE with incompatible schema on Hive-compatible table") {
+    val exception = intercept[AnalysisException] {
+      testAlterTable(
+        "t1",
+        "CREATE TABLE t1 (c1 string) USING parquet",
+        StructType(Array(StructField("c2", IntegerType))))
+    }
+    assert(exception.getMessage().contains("types incompatible with the existing columns"))
+  }
+
+  private def testAlterTable(
+      tableName: String,
+      createTableStmt: String,
+      updatedSchema: StructType,
+      hiveCompatible: Boolean = true): Unit = {
+    spark.sql(createTableStmt)
+    val oldTable = spark.sessionState.catalog.externalCatalog.getTable("default", tableName)
+    catalog.createTable(oldTable, true)
+    catalog.alterTableSchema("default", tableName, updatedSchema)
+
+    val updatedTable = catalog.getTable("default", tableName)
+    assert(updatedTable.schema.fieldNames === updatedSchema.fieldNames)
+  }
+
+}