diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 6cfc4a432131642ab6a8c74010c5fcf0552a39ac..bfcdb70fe47c1020b85870b16eba8ce1b5c8c725 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -259,7 +259,19 @@ class SessionCatalog(
     val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
     val table = formatTableName(tableDefinition.identifier.table)
     validateName(table)
-    val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
+
+    val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined
+      && !tableDefinition.storage.locationUri.get.isAbsolute) {
+      // make the location of the table qualified.
+      val qualifiedTableLocation =
+        makeQualifiedPath(tableDefinition.storage.locationUri.get)
+      tableDefinition.copy(
+        storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)),
+        identifier = TableIdentifier(table, Some(db)))
+    } else {
+      tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
+    }
+
     requireDbExists(db)
     externalCatalog.createTable(newTableDefinition, ignoreIfExists)
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index aa335c4453dd11f6f537f576da0d00b5afc05578..5f70a8ce8918badfa8a1202214a82ab6c7eb8c99 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -230,8 +230,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
   }
 
   private def getDBPath(dbName: String): URI = {
-    val warehousePath = s"file:${spark.sessionState.conf.warehousePath.stripPrefix("file:")}"
-    new Path(warehousePath, s"$dbName.db").toUri
+    val warehousePath = makeQualifiedPath(s"${spark.sessionState.conf.warehousePath}")
+    new Path(CatalogUtils.URIToString(warehousePath), s"$dbName.db").toUri
   }
 
   test("the qualified path of a database is stored in the catalog") {
@@ -1360,7 +1360,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     val partitionLocation = if (isUsingHiveMetastore) {
       val tableLocation = catalog.getTableMetadata(tableIdent).storage.locationUri
       assert(tableLocation.isDefined)
-      makeQualifiedPath(new Path(tableLocation.get.toString, "paris"))
+      makeQualifiedPath(new Path(tableLocation.get.toString, "paris").toString)
     } else {
       new URI("paris")
     }
@@ -1909,7 +1909,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
              |OPTIONS(path "$dir")
            """.stripMargin)
         val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-        assert(table.location == new URI(dir.getAbsolutePath))
+        assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
 
         dir.delete
         assert(!dir.exists)
@@ -1950,7 +1950,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
              |LOCATION "$dir"
            """.stripMargin)
         val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-        assert(table.location == new URI(dir.getAbsolutePath))
+        assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
 
         spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
         checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)
@@ -1976,7 +1976,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
              |OPTIONS(path "$dir")
            """.stripMargin)
         val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-        assert(table.location == new URI(dir.getAbsolutePath))
+
+        assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
 
         dir.delete()
         checkAnswer(spark.table("t"), Nil)
@@ -2032,7 +2033,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
                  |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
                """.stripMargin)
             val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-            assert(table.location == new URI(dir.getAbsolutePath))
+            assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
 
             checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
         }
@@ -2051,7 +2052,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
                  |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
                """.stripMargin)
             val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
-            assert(table.location == new URI(dir.getAbsolutePath))
+            assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
 
             val partDir = new File(dir, "a=3")
             assert(partDir.exists())
@@ -2099,7 +2100,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
              """.stripMargin)
 
           val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-          assert(table.location == new Path(loc.getAbsolutePath).toUri)
+          assert(table.location == makeQualifiedPath(loc.getAbsolutePath))
           assert(new Path(table.location).toString.contains(specialChars))
 
           assert(loc.listFiles().isEmpty)
@@ -2120,7 +2121,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
              """.stripMargin)
 
           val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
-          assert(table.location == new Path(loc.getAbsolutePath).toUri)
+          assert(table.location == makeQualifiedPath(loc.getAbsolutePath))
           assert(new Path(table.location).toString.contains(specialChars))
 
           assert(loc.listFiles().isEmpty)
@@ -2162,4 +2163,33 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
       }
     }
   }
+
+  test("the qualified path of a datasource table is stored in the catalog") {
+    withTable("t", "t1") {
+      withTempDir { dir =>
+        assert(!dir.getAbsolutePath.startsWith("file:/"))
+        spark.sql(
+          s"""
+             |CREATE TABLE t(a string)
+             |USING parquet
+             |LOCATION '$dir'
+           """.stripMargin)
+        val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+        assert(table.location.toString.startsWith("file:/"))
+      }
+
+      withTempDir { dir =>
+        assert(!dir.getAbsolutePath.startsWith("file:/"))
+        spark.sql(
+          s"""
+             |CREATE TABLE t1(a string, b string)
+             |USING parquet
+             |PARTITIONED BY(b)
+             |LOCATION '$dir'
+           """.stripMargin)
+        val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+        assert(table.location.toString.startsWith("file:/"))
+      }
+    }
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index fcb8ffbc6edd01c1f10b87ca523f19b21a78c331..9742b3b2d5c2924310e46f2d60ca9fa2b703fe11 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.internal
 
 import java.io.File
-import java.net.URI
 
 import org.scalatest.BeforeAndAfterEach
 
@@ -459,7 +458,7 @@ class CatalogSuite
           options = Map("path" -> dir.getAbsolutePath))
         val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
         assert(table.tableType == CatalogTableType.EXTERNAL)
-        assert(table.storage.locationUri.get == new URI(dir.getAbsolutePath))
+        assert(table.storage.locationUri.get == makeQualifiedPath(dir.getAbsolutePath))
 
         Seq((1)).toDF("i").write.insertInto("t")
         assert(dir.exists() && dir.listFiles().nonEmpty)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
index 7ab339e005295d0d1f760fbd7f8db2eadfc33138..60adee4599b0bfac00f493348196bf69e3090ff0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
@@ -75,7 +75,7 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
            |USING ${classOf[TestOptionsSource].getCanonicalName}
            |OPTIONS (PATH '/tmp/path')
         """.stripMargin)
-      assert(getPathOption("src") == Some("/tmp/path"))
+      assert(getPathOption("src") == Some("file:/tmp/path"))
     }
 
     // should exist even path option is not specified when creating table
@@ -88,15 +88,16 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
   test("path option also exist for write path") {
     withTable("src") {
       withTempPath { p =>
-        val path = new Path(p.getAbsolutePath).toString
         sql(
           s"""
             |CREATE TABLE src
             |USING ${classOf[TestOptionsSource].getCanonicalName}
-            |OPTIONS (PATH '$path')
+            |OPTIONS (PATH '$p')
             |AS SELECT 1
           """.stripMargin)
-        assert(spark.table("src").schema.head.metadata.getString("path") == path)
+        assert(CatalogUtils.stringToURI(
+          spark.table("src").schema.head.metadata.getString("path")) ==
+          makeQualifiedPath(p.getAbsolutePath))
       }
     }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 12fc8993d7396bab9c67c4e205a72263fc7296b3..9201954b66d10cfcf5ff9cdb366e23ba86a4a3a4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -306,11 +306,6 @@ private[sql] trait SQLTestUtils
     val fs = hadoopPath.getFileSystem(spark.sessionState.newHadoopConf())
     fs.makeQualified(hadoopPath).toUri
   }
-
-  def makeQualifiedPath(path: Path): URI = {
-    val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
-    fs.makeQualified(path).toUri
-  }
 }
 
 private[sql] object SQLTestUtils {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index cf552b4a88b2c99a0c941ee26b89ada0a1187422..079358b29a19175fc1edebb208e0ea9da58343b6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.hive
 
-import java.net.URI
-
 import org.apache.spark.sql.{QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType
@@ -142,7 +140,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
           assert(hiveTable.storage.serde === Some(serde))
 
           assert(hiveTable.tableType === CatalogTableType.EXTERNAL)
-          assert(hiveTable.storage.locationUri === Some(new URI(path.getAbsolutePath)))
+          assert(hiveTable.storage.locationUri === Some(makeQualifiedPath(dir.getAbsolutePath)))
 
           val columns = hiveTable.schema
           assert(columns.map(_.name) === Seq("d1", "d2"))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index dd624eca6b7b0c7c15ad99a09d96699d9524195d..6025f8adbce28b6ac7f4ebf786221a1a5eb4d9b1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -658,19 +658,17 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
 
         val tPath = new Path(spark.sessionState.conf.warehousePath, "t")
         Seq("1").toDF("a").write.saveAsTable("t")
-        val expectedPath = s"file:${tPath.toUri.getPath.stripSuffix("/")}"
         val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
 
-        assert(table.location == CatalogUtils.stringToURI(expectedPath))
+        assert(table.location == makeQualifiedPath(tPath.toString))
         assert(tPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(tPath))
         checkAnswer(spark.table("t"), Row("1") :: Nil)
 
         val t1Path = new Path(spark.sessionState.conf.warehousePath, "t1")
         spark.sql("create table t1 using parquet as select 2 as a")
         val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
-        val expectedPath1 = s"file:${t1Path.toUri.getPath.stripSuffix("/")}"
 
-        assert(table1.location == CatalogUtils.stringToURI(expectedPath1))
+        assert(table1.location == makeQualifiedPath(t1Path.toString))
         assert(t1Path.getFileSystem(spark.sessionState.newHadoopConf()).exists(t1Path))
         checkAnswer(spark.table("t1"), Row(2) :: Nil)
       }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index fce055048d72fd0625e32309e6e67e3eb46a1623..23aea24697785ae5765abe87f6ca2870aa7cf7b0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1681,7 +1681,7 @@ class HiveDDLSuite
                """.stripMargin)
 
             val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-            assert(table.location == new URI(dir.getAbsolutePath))
+            assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
 
             checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
         }
@@ -1701,7 +1701,7 @@ class HiveDDLSuite
                """.stripMargin)
 
             val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
-            assert(table.location == new URI(dir.getAbsolutePath))
+            assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
 
             val partDir = new File(dir, "a=3")
             assert(partDir.exists())