diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 2cc8d65d3cb79e5a1a03ddae5a3f3e6548cf8ef3..8af5a4848fd44620f1b65a1f0bdd402a8fd93f4f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -440,6 +440,17 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
           val attributedRewrites = relation.output.zip(parquetRelation.output)
           (relation, parquetRelation, attributedRewrites)
 
+        // Write path
+        case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _)
+          // Inserting into partitioned table is not supported in Parquet data source (yet).
+          if !relation.hiveQlTable.isPartitioned &&
+            hive.convertMetastoreParquet &&
+            hive.conf.parquetUseDataSourceApi &&
+            relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
+          val parquetRelation = convertToParquetRelation(relation)
+          val attributedRewrites = relation.output.zip(parquetRelation.output)
+          (relation, parquetRelation, attributedRewrites)
+
         // Read path
         case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
             if hive.convertMetastoreParquet &&
@@ -464,6 +475,16 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
 
           withAlias
         }
+        case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite)
+          if relationMap.contains(r) => {
+          val parquetRelation = relationMap(r)
+          InsertIntoTable(parquetRelation, partition, child, overwrite)
+        }
+        case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite)
+          if relationMap.contains(r) => {
+          val parquetRelation = relationMap(r)
+          InsertIntoTable(parquetRelation, partition, child, overwrite)
+        }
         case other => other.transformExpressions {
           case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
         }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index 653f4b47367c447ced64b6926f6d14c49a7c62a9..80fd5cda20e2054c3bf77a0cafc37b61c909e2a5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -24,11 +24,11 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql.{SQLConf, QueryTest}
 import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.execution.PhysicalRDD
-import org.apache.spark.sql.hive.execution.HiveTableScan
+import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
+import org.apache.spark.sql.hive.execution.{InsertIntoHiveTable, HiveTableScan}
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.sources.LogicalRelation
+import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
 
 // The data where the partitioning key exists only in the directory structure.
 case class ParquetData(intField: Int, stringField: String)
@@ -93,6 +93,11 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
       sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)")
     }
 
+    val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
+    jsonRDD(rdd1).registerTempTable("jt")
+    val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":[$i, null]}"""))
+    jsonRDD(rdd2).registerTempTable("jt_array")
+
     setConf("spark.sql.hive.convertMetastoreParquet", "true")
   }
 
@@ -100,6 +105,8 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
     sql("DROP TABLE partitioned_parquet")
     sql("DROP TABLE partitioned_parquet_with_key")
     sql("DROP TABLE normal_parquet")
+    sql("DROP TABLE IF EXISTS jt")
+    sql("DROP TABLE IF EXISTS jt_array")
     setConf("spark.sql.hive.convertMetastoreParquet", "false")
   }
 
@@ -122,9 +129,6 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
   override def beforeAll(): Unit = {
     super.beforeAll()
 
-    val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
-    jsonRDD(rdd).registerTempTable("jt")
-
     sql(
       """
         |create table test_parquet
@@ -143,7 +147,6 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
 
   override def afterAll(): Unit = {
     super.afterAll()
-    sql("DROP TABLE IF EXISTS jt")
     sql("DROP TABLE IF EXISTS test_parquet")
 
     setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
@@ -238,6 +241,70 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
 
     sql("DROP TABLE IF EXISTS test_parquet_ctas")
   }
+
+  test("MetastoreRelation in InsertIntoTable will be converted") {
+    sql(
+      """
+        |create table test_insert_parquet
+        |(
+        |  intField INT
+        |)
+        |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      """.stripMargin)
+
+    val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
+    df.queryExecution.executedPlan match {
+      case ExecutedCommand(
+        InsertIntoDataSource(
+          LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK
+      case o => fail("test_insert_parquet should be converted to a " +
+        s"${classOf[ParquetRelation2].getCanonicalName} and " +
+        s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
+        s"However, found a ${o.toString} ")
+    }
+
+    checkAnswer(
+      sql("SELECT intField FROM test_insert_parquet WHERE test_insert_parquet.intField > 5"),
+      sql("SELECT a FROM jt WHERE jt.a > 5").collect()
+    )
+
+    sql("DROP TABLE IF EXISTS test_insert_parquet")
+  }
+
+  test("MetastoreRelation in InsertIntoHiveTable will be converted") {
+    sql(
+      """
+        |create table test_insert_parquet
+        |(
+        |  int_array array<int>
+        |)
+        |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      """.stripMargin)
+
+    val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
+    df.queryExecution.executedPlan match {
+      case ExecutedCommand(
+        InsertIntoDataSource(
+          LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK
+      case o => fail("test_insert_parquet should be converted to a " +
+        s"${classOf[ParquetRelation2].getCanonicalName} and " +
+        s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
+        s"However, found a ${o.toString} ")
+    }
+
+    checkAnswer(
+      sql("SELECT int_array FROM test_insert_parquet"),
+      sql("SELECT a FROM jt_array").collect()
+    )
+
+    sql("DROP TABLE IF EXISTS test_insert_parquet")
+  }
 }
 
 class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
@@ -252,6 +319,63 @@ class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
     super.afterAll()
     setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
   }
+
+  test("MetastoreRelation in InsertIntoTable will not be converted") {
+    sql(
+      """
+        |create table test_insert_parquet
+        |(
+        |  intField INT
+        |)
+        |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      """.stripMargin)
+
+    val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
+    df.queryExecution.executedPlan match {
+      case insert: InsertIntoHiveTable => // OK
+      case o => fail(s"The SparkPlan should be ${classOf[InsertIntoHiveTable].getCanonicalName}. " +
+        s"However, found ${o.toString}.")
+    }
+
+    checkAnswer(
+      sql("SELECT intField FROM test_insert_parquet WHERE test_insert_parquet.intField > 5"),
+      sql("SELECT a FROM jt WHERE jt.a > 5").collect()
+    )
+
+    sql("DROP TABLE IF EXISTS test_insert_parquet")
+  }
+
+  // TODO: enable it after the fix of SPARK-5950.
+  ignore("MetastoreRelation in InsertIntoHiveTable will not be converted") {
+    sql(
+      """
+        |create table test_insert_parquet
+        |(
+        |  int_array array<int>
+        |)
+        |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      """.stripMargin)
+
+    val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
+    df.queryExecution.executedPlan match {
+      case insert: InsertIntoHiveTable => // OK
+      case o => fail(s"The SparkPlan should be ${classOf[InsertIntoHiveTable].getCanonicalName}. " +
+        s"However, found ${o.toString}.")
+    }
+
+    checkAnswer(
+      sql("SELECT int_array FROM test_insert_parquet"),
+      sql("SELECT a FROM jt_array").collect()
+    )
+
+    sql("DROP TABLE IF EXISTS test_insert_parquet")
+  }
 }
 
 /**