diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 5abd5794765044ae86e97d3edf9f4675bec189d9..d835b521166a8d500caabe6ba4292ecfb73bc64f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -141,7 +141,7 @@ case class CreateDataSourceTableAsSelectCommand(
       }
 
       saveDataIntoTable(
-        sparkSession, table, table.storage.locationUri, query, mode, tableExists = true)
+        sparkSession, table, table.storage.locationUri, query, SaveMode.Append, tableExists = true)
     } else {
       assert(table.schema.isEmpty)
 
@@ -151,7 +151,7 @@ case class CreateDataSourceTableAsSelectCommand(
         table.storage.locationUri
       }
       val result = saveDataIntoTable(
-        sparkSession, table, tableLocation, query, mode, tableExists = false)
+        sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false)
       val newTable = table.copy(
         storage = table.storage.copy(locationUri = tableLocation),
         // We will use the schema of resolved.relation as the schema of the table (instead of
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index b44f20e367f0ac9f1a21fa2aeb027721a57d5e89..8b8cd0fdf4db2acdead5fd53467ca47d6e515d42 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1836,18 +1836,17 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
   test("insert data to a data source table which has a not existed location should succeed") {
     withTable("t") {
       withTempDir { dir =>
-        val path = dir.toURI.toString.stripSuffix("/")
         spark.sql(
           s"""
              |CREATE TABLE t(a string, b int)
              |USING parquet
-             |OPTIONS(path "$path")
+             |OPTIONS(path "$dir")
            """.stripMargin)
         val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-        assert(table.location == path)
+        assert(table.location == dir.getAbsolutePath)
 
         dir.delete
-        val tableLocFile = new File(table.location.stripPrefix("file:"))
+        val tableLocFile = new File(table.location)
         assert(!tableLocFile.exists)
         spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
         assert(tableLocFile.exists)
@@ -1878,16 +1877,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
   test("insert into a data source table with no existed partition location should succeed") {
     withTable("t") {
       withTempDir { dir =>
-        val path = dir.toURI.toString.stripSuffix("/")
         spark.sql(
           s"""
              |CREATE TABLE t(a int, b int, c int, d int)
              |USING parquet
              |PARTITIONED BY(a, b)
-             |LOCATION "$path"
+             |LOCATION "$dir"
            """.stripMargin)
         val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-        assert(table.location == path)
+        assert(table.location == dir.getAbsolutePath)
 
         spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
         checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)
@@ -1906,15 +1904,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
   test("read data from a data source table which has a not existed location should succeed") {
     withTable("t") {
       withTempDir { dir =>
-        val path = dir.toURI.toString.stripSuffix("/")
         spark.sql(
           s"""
              |CREATE TABLE t(a string, b int)
              |USING parquet
-             |OPTIONS(path "$path")
+             |OPTIONS(path "$dir")
            """.stripMargin)
         val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-        assert(table.location == path)
+        assert(table.location == dir.getAbsolutePath)
 
         dir.delete()
         checkAnswer(spark.table("t"), Nil)
@@ -1939,7 +1936,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
              |CREATE TABLE t(a int, b int, c int, d int)
              |USING parquet
              |PARTITIONED BY(a, b)
-             |LOCATION "${dir.toURI}"
+             |LOCATION "$dir"
            """.stripMargin)
         spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
         checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)
@@ -1952,4 +1949,51 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       }
     }
   }
+
+  Seq(true, false).foreach { shouldDelete =>
+    val tcName = if (shouldDelete) "non-existent" else "existed"
+    test(s"CTAS for external data source table with a $tcName location") {
+      withTable("t", "t1") {
+        withTempDir {
+          dir =>
+            if (shouldDelete) {
+              dir.delete()
+            }
+            spark.sql(
+              s"""
+                 |CREATE TABLE t
+                 |USING parquet
+                 |LOCATION '$dir'
+                 |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
+               """.stripMargin)
+            val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+            assert(table.location == dir.getAbsolutePath)
+
+            checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
+        }
+        // partition table
+        withTempDir {
+          dir =>
+            if (shouldDelete) {
+              dir.delete()
+            }
+            spark.sql(
+              s"""
+                 |CREATE TABLE t1
+                 |USING parquet
+                 |PARTITIONED BY(a, b)
+                 |LOCATION '$dir'
+                 |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
+               """.stripMargin)
+            val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+            assert(table.location == dir.getAbsolutePath)
+
+            val partDir = new File(dir, "a=3")
+            assert(partDir.exists())
+
+            checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
+        }
+      }
+    }
+  }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 792ac1e259494b1ad6b9f3a16ee6274181cb11f0..81ae5b7bdb6728eebaf7112b5717891e0bbba9e0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1587,4 +1587,103 @@ class HiveDDLSuite
       }
     }
   }
+
+  Seq(true, false).foreach { shouldDelete =>
+    val tcName = if (shouldDelete) "non-existent" else "existed"
+    test(s"CTAS for external data source table with a $tcName location") {
+      withTable("t", "t1") {
+        withTempDir {
+          dir =>
+            if (shouldDelete) {
+              dir.delete()
+            }
+            spark.sql(
+              s"""
+                 |CREATE TABLE t
+                 |USING parquet
+                 |LOCATION '$dir'
+                 |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
+               """.stripMargin)
+
+            val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+            assert(table.location == dir.getAbsolutePath)
+
+            checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
+        }
+        // partition table
+        withTempDir {
+          dir =>
+            if (shouldDelete) {
+              dir.delete()
+            }
+            spark.sql(
+              s"""
+                 |CREATE TABLE t1
+                 |USING parquet
+                 |PARTITIONED BY(a, b)
+                 |LOCATION '$dir'
+                 |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
+               """.stripMargin)
+
+            val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+            assert(table.location == dir.getAbsolutePath)
+
+            val partDir = new File(dir, "a=3")
+            assert(partDir.exists())
+
+            checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
+        }
+      }
+    }
+
+    test(s"CTAS for external hive table with a $tcName location") {
+      withTable("t", "t1") {
+        withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+          withTempDir {
+            dir =>
+              if (shouldDelete) {
+                dir.delete()
+              }
+              spark.sql(
+                s"""
+                   |CREATE TABLE t
+                   |USING hive
+                   |LOCATION '$dir'
+                   |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
+                 """.stripMargin)
+              val dirPath = new Path(dir.getAbsolutePath)
+              val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf())
+              val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+              assert(new Path(table.location) == fs.makeQualified(dirPath))
+
+              checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
+          }
+          // partition table
+          withTempDir {
+            dir =>
+              if (shouldDelete) {
+                dir.delete()
+              }
+              spark.sql(
+                s"""
+                   |CREATE TABLE t1
+                   |USING hive
+                   |PARTITIONED BY(a, b)
+                   |LOCATION '$dir'
+                   |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
+                 """.stripMargin)
+              val dirPath = new Path(dir.getAbsolutePath)
+              val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf())
+              val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+              assert(new Path(table.location) == fs.makeQualified(dirPath))
+
+              val partDir = new File(dir, "a=3")
+              assert(partDir.exists())
+
+              checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
+          }
+        }
+      }
+    }
+  }
 }