diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 52666119351b1399abdd83337912e7fdc2adacb0..5d663949df6b58c095eaf06250b71e0715126375 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -424,7 +424,8 @@ case class DataSource(
             _ => Unit, // No existing table needs to be refreshed.
             options,
             data.logicalPlan,
-            mode)
+            mode,
+            catalogTable)
         sparkSession.sessionState.executePlan(plan).toRdd
         // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it.
         copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index a548e88cb683a25cad1331a55c0494e781519e43..2d43a6ad098ed2ccb4b80ac0a71ae686648bb27f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -162,7 +162,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
 
 
     case i @ logical.InsertIntoTable(
-           l @ LogicalRelation(t: HadoopFsRelation, _, _), part, query, overwrite, false)
+           l @ LogicalRelation(t: HadoopFsRelation, _, table), part, query, overwrite, false)
         if query.resolved && t.schema.asNullable == query.schema.asNullable =>
 
       // Sanity checks
@@ -222,7 +222,8 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
         refreshPartitionsCallback,
         t.options,
         query,
-        mode)
+        mode,
+        table)
 
       insertCmd
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 9c75e2ae747618c3d22112f96aeee47e00010819..a0a8cb5024c334751870d146468fdaf8bc7f5981 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -41,7 +41,8 @@ case class InsertIntoHadoopFsRelationCommand(
     refreshFunction: (Seq[TablePartitionSpec]) => Unit,
     options: Map[String, String],
     @transient query: LogicalPlan,
-    mode: SaveMode)
+    mode: SaveMode,
+    catalogTable: Option[CatalogTable])
   extends RunnableCommand {
 
   override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 9fc62a389db4d95709a258153e8d2f2f949b3f84..3644ff952eb0d63ae6f41349e64401ee887cce42 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -307,7 +307,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
 
       val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
       df.queryExecution.sparkPlan match {
-        case ExecutedCommandExec(_: InsertIntoHadoopFsRelationCommand) => // OK
+        case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) =>
+          assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet"))
         case o => fail("test_insert_parquet should be converted to a " +
           s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
           s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan. " +
@@ -337,7 +338,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
 
       val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
       df.queryExecution.sparkPlan match {
-        case ExecutedCommandExec(_: InsertIntoHadoopFsRelationCommand) => // OK
+        case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) =>
+          assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet"))
         case o => fail("test_insert_parquet should be converted to a " +
           s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
           s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan." +