diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 146e036bb4843eef7d2ad129d9bf7c5889295957..a85ac162957cc8c3738f7f22bd80a451cc1d1ff6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -307,7 +307,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
         table, provider, temp, partitionColumnNames, bucketSpec, mode, options, query)
     } else {
       val struct = Option(ctx.colTypeList()).map(createStructType)
-      CreateTableUsing(table, struct, provider, temp, options, ifNotExists, managedIfNoPath = false)
+      CreateTableUsing(table, struct, provider, temp, options, ifNotExists, managedIfNoPath = true)
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index e07ab99ef3e828e2e6eabdd0f4594859838ea4cb..16d61157379b4a79d2ec1bda57014571ac5f83c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -97,7 +97,7 @@ case class CreateDataSourceTableCommand(
       userSpecifiedSchema = userSpecifiedSchema,
       className = provider,
       bucketSpec = None,
-      options = optionsWithPath).resolveRelation()
+      options = optionsWithPath).resolveRelation(checkPathExist = false)
 
     CreateDataSourceTableUtils.createDataSourceTable(
       sparkSession = sparkSession,
@@ -382,7 +382,8 @@ object CreateDataSourceTableUtils extends Logging {
     // TODO: Support persisting partitioned data source relations in Hive compatible format
     val qualifiedTableName = tableIdent.quotedString
     val skipHiveMetadata = options.getOrElse("skipHiveMetadata", "false").toBoolean
-    val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.resolveRelation()) match {
+    val resolvedRelation = dataSource.resolveRelation(checkPathExist = false)
+    val (hiveCompatibleTable, logMessage) = (maybeSerDe, resolvedRelation) match {
       case _ if skipHiveMetadata =>
         val message =
           s"Persisting partitioned data source relation $qualifiedTableName into " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 618ea3d669bd0cc3e772f7004fd9541c397696a2..0342ec569dc1e7fa0f9ed1b317eb6a89204671de 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -239,8 +239,15 @@ case class DataSource(
     }
   }
 
-  /** Create a resolved [[BaseRelation]] that can be used to read data from this [[DataSource]] */
-  def resolveRelation(): BaseRelation = {
+  /**
+   * Create a resolved [[BaseRelation]] that can be used to read data from or write data into this
+   * [[DataSource]]
+   *
+   * @param checkPathExist A flag to indicate whether to check the existence of path or not.
+   *                       This flag will be set to false when we create an empty table (the
+   *                       path of the table does not exist).
+   */
+  def resolveRelation(checkPathExist: Boolean = true): BaseRelation = {
     val caseInsensitiveOptions = new CaseInsensitiveMap(options)
     val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
       // TODO: Throw when too much is given.
@@ -288,11 +295,11 @@ case class DataSource(
           val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
           val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified)
 
-          if (globPath.isEmpty) {
+          if (checkPathExist && globPath.isEmpty) {
             throw new AnalysisException(s"Path does not exist: $qualified")
           }
           // Sufficient to check head of the globPath seq for non-glob scenario
-          if (!fs.exists(globPath.head)) {
+          if (checkPathExist && !fs.exists(globPath.head)) {
             throw new AnalysisException(s"Path does not exist: ${globPath.head}")
           }
           globPath
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 4845da7c853c8601b245f941d69486243bad24ea..1d597fe16d1d6a59cb02098a2d240cf32e264c42 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1553,4 +1553,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
       assert(cause.getMessage.contains("Column ordering must be ASC, was 'DESC'"))
     }
   }
+
+  test("insert into datasource table") {
+    withTable("tbl") {
+      sql("CREATE TABLE tbl(i INT, j STRING) USING parquet")
+      Seq(1 -> "a").toDF("i", "j").write.mode("overwrite").insertInto("tbl")
+      checkAnswer(sql("SELECT * FROM tbl"), Row(1, "a"))
+    }
+  }
 }