diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index fba4066af6bdb42a6a72db005459bf93706971a6..02cc3985b01005c9d34a02dcb026481d769aba71 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -179,7 +179,7 @@ class Dataset[T] private[sql](
       case _ => false
     }
 
-    queryExecution.logical match {
+    queryExecution.analyzed match {
       // For various commands (like DDL) and queries with side effects, we force query execution
       // to happen right away to let these side effects take place eagerly.
       case p if hasSideEffects(p) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 4780eb473d79bd61205b6d1132d389f554887a41..bade41b1eddf9895c922fe7382eef4a16c262b72 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.sources
 import java.io.File
 
 import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.Utils
 
@@ -259,4 +260,28 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
 
     spark.catalog.dropTempView("oneToTen")
   }
+
+  test("SPARK-15824 - Execute an INSERT wrapped in a WITH statement immediately") {
+    withTable("target", "target2") {
+      sql(s"CREATE TABLE target(a INT, b STRING) USING JSON")
+      sql("WITH tbl AS (SELECT * FROM jt) INSERT OVERWRITE TABLE target SELECT a, b FROM tbl")
+      checkAnswer(
+        sql("SELECT a, b FROM target"),
+        sql("SELECT a, b FROM jt")
+      )
+
+      sql(s"CREATE TABLE target2(a INT, b STRING) USING JSON")
+      val e = sql(
+        """
+          |WITH tbl AS (SELECT * FROM jt)
+          |FROM tbl
+          |INSERT INTO target2 SELECT a, b WHERE a <= 5
+          |INSERT INTO target2 SELECT a, b WHERE a > 5
+        """.stripMargin)
+      checkAnswer(
+        sql("SELECT a, b FROM target2"),
+        sql("SELECT a, b FROM jt")
+      )
+    }
+  }
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d24cde232119cb69faf2d349bfa978e469074e09..224ff3823b59427039807cd4443598e186ddc922 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -544,7 +544,7 @@ private[hive] case class InsertIntoHiveTable(
     child: LogicalPlan,
     overwrite: Boolean,
     ifNotExists: Boolean)
-  extends LogicalPlan {
+  extends LogicalPlan with Command {
 
   override def children: Seq[LogicalPlan] = child :: Nil
   override def output: Seq[Attribute] = Seq.empty