diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index bbd920a4051de24f230bd1e81a2d15e541391cd1..76d329a3ddcdfaffaaff774ee1f1e9f047bcc075 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -116,7 +116,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
   }
 
   override def refreshTable(databaseName: String, tableName: String): Unit = {
-    cachedDataSourceTables.refresh(QualifiedTableName(databaseName, tableName).toLowerCase)
+    // refresh table does not eagerly reload the cache. It just invalidate the cache.
+    // Next time when we use the table, it will be populated in the cache.
+    invalidateTable(databaseName, tableName)
   }
 
   def invalidateTable(databaseName: String, tableName: String): Unit = {
@@ -229,13 +231,42 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
   private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = {
     val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
     val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
-    val parquetOptions = Map(
-      ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
-      ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
 
     // NOTE: Instead of passing Metastore schema directly to `ParquetRelation2`, we have to
     // serialize the Metastore schema to JSON and pass it as a data source option because of the
     // evil case insensitivity issue, which is reconciled within `ParquetRelation2`.
+    val parquetOptions = Map(
+      ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
+      ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
+    val tableIdentifier =
+      QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
+
+    def getCached(
+      tableIdentifier: QualifiedTableName,
+      pathsInMetastore: Seq[String],
+      schemaInMetastore: StructType,
+      partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
+      cachedDataSourceTables.getIfPresent(tableIdentifier) match {
+        case null => None // Cache miss
+        case logical @ LogicalRelation(parquetRelation: ParquetRelation2) =>
+          // If we have the same paths, same schema, and same partition spec,
+          // we will use the cached Parquet Relation.
+          val useCached =
+            parquetRelation.paths == pathsInMetastore &&
+            logical.schema.sameType(metastoreSchema) &&
+            parquetRelation.maybePartitionSpec == partitionSpecInMetastore
+
+          if (useCached) Some(logical) else None
+        case other =>
+          logWarning(
+            s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} shold be stored " +
+              s"as Parquet. However, we are getting a ${other} from the metastore cache. " +
+              s"This cached entry will be invalidated.")
+          cachedDataSourceTables.invalidate(tableIdentifier)
+          None
+      }
+    }
+
     if (metastoreRelation.hiveQlTable.isPartitioned) {
       val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
       val partitionColumnDataTypes = partitionSchema.map(_.dataType)
@@ -248,10 +279,28 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
       }
       val partitionSpec = PartitionSpec(partitionSchema, partitions)
       val paths = partitions.map(_.path)
-      LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive))
+
+      val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec))
+      val parquetRelation = cached.getOrElse {
+        val created =
+          LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive))
+        cachedDataSourceTables.put(tableIdentifier, created)
+        created
+      }
+
+      parquetRelation
     } else {
       val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
-      LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))
+
+      val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
+      val parquetRelation = cached.getOrElse {
+        val created =
+          LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))
+        cachedDataSourceTables.put(tableIdentifier, created)
+        created
+      }
+
+      parquetRelation
     }
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 432d65a87451850b338009f13ec6ff61714160c8..2ad6e867262b1969c65abfd93193adc9efbcb986 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -26,8 +26,10 @@ import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode}
 import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
 import org.apache.spark.sql.hive.execution.HiveTableScan
+import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
+import org.apache.spark.sql.json.JSONRelation
 import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
 import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
 import org.apache.spark.sql.SaveMode
@@ -390,6 +392,116 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
 
     sql("DROP TABLE ms_convert")
   }
+
+  test("Caching converted data source Parquet Relations") {
+    def checkCached(tableIdentifer: catalog.QualifiedTableName): Unit = {
+      // Converted test_parquet should be cached.
+      catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match {
+        case null => fail("Converted test_parquet should be cached in the cache.")
+        case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => // OK
+        case other =>
+          fail(
+            "The cached test_parquet should be a Parquet Relation. " +
+              s"However, $other is returned form the cache.")
+      }
+    }
+
+    sql("DROP TABLE IF EXISTS test_insert_parquet")
+    sql("DROP TABLE IF EXISTS test_parquet_partitioned_cache_test")
+
+    sql(
+      """
+        |create table test_insert_parquet
+        |(
+        |  intField INT,
+        |  stringField STRING
+        |)
+        |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      """.stripMargin)
+
+    var tableIdentifer = catalog.QualifiedTableName("default", "test_insert_parquet")
+
+    // First, make sure the converted test_parquet is not cached.
+    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+    // Table lookup will make the table cached.
+    table("test_insert_parquet")
+    checkCached(tableIdentifer)
+    // For insert into non-partitioned table, we will do the conversion,
+    // so the converted test_insert_parquet should be cached.
+    invalidateTable("test_insert_parquet")
+    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+    sql(
+      """
+        |INSERT INTO TABLE test_insert_parquet
+        |select a, b from jt
+      """.stripMargin)
+    checkCached(tableIdentifer)
+    // Make sure we can read the data.
+    checkAnswer(
+      sql("select * from test_insert_parquet"),
+      sql("select a, b from jt").collect())
+    // Invalidate the cache.
+    invalidateTable("test_insert_parquet")
+    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+
+    // Create a partitioned table.
+    sql(
+      """
+        |create table test_parquet_partitioned_cache_test
+        |(
+        |  intField INT,
+        |  stringField STRING
+        |)
+        |PARTITIONED BY (date string)
+        |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      """.stripMargin)
+
+    tableIdentifer = catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test")
+    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+    sql(
+      """
+        |INSERT INTO TABLE test_parquet_partitioned_cache_test
+        |PARTITION (date='2015-04-01')
+        |select a, b from jt
+      """.stripMargin)
+    // Right now, insert into a partitioned Parquet is not supported in data source Parquet.
+    // So, we expect it is not cached.
+    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+    conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
+    sql(
+      """
+        |INSERT INTO TABLE test_parquet_partitioned_cache_test
+        |PARTITION (date='2015-04-02')
+        |select a, b from jt
+      """.stripMargin)
+    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+    conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+
+    // Make sure we can cache the partitioned table.
+    table("test_parquet_partitioned_cache_test")
+    checkCached(tableIdentifer)
+    // Make sure we can read the data.
+    checkAnswer(
+      sql("select STRINGField, date, intField from test_parquet_partitioned_cache_test"),
+      sql(
+        """
+          |select b, '2015-04-01', a FROM jt
+          |UNION ALL
+          |select b, '2015-04-02', a FROM jt
+        """.stripMargin).collect())
+
+    invalidateTable("test_parquet_partitioned_cache_test")
+    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+
+    sql("DROP TABLE test_insert_parquet")
+    sql("DROP TABLE test_parquet_partitioned_cache_test")
+  }
 }
 
 class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {