Skip to content
Snippets Groups Projects
Commit 4b82bd73 authored by Yin Huai's avatar Yin Huai Committed by Michael Armbrust
Browse files

[SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata

https://issues.apache.org/jira/browse/SPARK-6575

Author: Yin Huai <yhuai@databricks.com>

Closes #5339 from yhuai/parquetRelationCache and squashes the following commits:

83d9846 [Yin Huai] Remove unnecessary change.
c0dc7a4 [Yin Huai] Cache converted parquet relations.
parent 45134ec9
No related branches found
No related tags found
No related merge requests found
......@@ -116,7 +116,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
override def refreshTable(databaseName: String, tableName: String): Unit = {
cachedDataSourceTables.refresh(QualifiedTableName(databaseName, tableName).toLowerCase)
// refresh table does not eagerly reload the cache. It just invalidate the cache.
// Next time when we use the table, it will be populated in the cache.
invalidateTable(databaseName, tableName)
}
def invalidateTable(databaseName: String, tableName: String): Unit = {
......@@ -229,13 +231,42 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = {
val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
val parquetOptions = Map(
ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
// NOTE: Instead of passing Metastore schema directly to `ParquetRelation2`, we have to
// serialize the Metastore schema to JSON and pass it as a data source option because of the
// evil case insensitivity issue, which is reconciled within `ParquetRelation2`.
val parquetOptions = Map(
ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
val tableIdentifier =
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
def getCached(
tableIdentifier: QualifiedTableName,
pathsInMetastore: Seq[String],
schemaInMetastore: StructType,
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
case logical @ LogicalRelation(parquetRelation: ParquetRelation2) =>
// If we have the same paths, same schema, and same partition spec,
// we will use the cached Parquet Relation.
val useCached =
parquetRelation.paths == pathsInMetastore &&
logical.schema.sameType(metastoreSchema) &&
parquetRelation.maybePartitionSpec == partitionSpecInMetastore
if (useCached) Some(logical) else None
case other =>
logWarning(
s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} shold be stored " +
s"as Parquet. However, we are getting a ${other} from the metastore cache. " +
s"This cached entry will be invalidated.")
cachedDataSourceTables.invalidate(tableIdentifier)
None
}
}
if (metastoreRelation.hiveQlTable.isPartitioned) {
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
val partitionColumnDataTypes = partitionSchema.map(_.dataType)
......@@ -248,10 +279,28 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
val paths = partitions.map(_.path)
LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive))
val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec))
val parquetRelation = cached.getOrElse {
val created =
LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive))
cachedDataSourceTables.put(tableIdentifier, created)
created
}
parquetRelation
} else {
val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))
val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
val parquetRelation = cached.getOrElse {
val created =
LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))
cachedDataSourceTables.put(tableIdentifier, created)
created
}
parquetRelation
}
}
......
......@@ -26,8 +26,10 @@ import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.json.JSONRelation
import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
import org.apache.spark.sql.SaveMode
......@@ -390,6 +392,116 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
sql("DROP TABLE ms_convert")
}
test("Caching converted data source Parquet Relations") {
def checkCached(tableIdentifer: catalog.QualifiedTableName): Unit = {
// Converted test_parquet should be cached.
catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match {
case null => fail("Converted test_parquet should be cached in the cache.")
case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => // OK
case other =>
fail(
"The cached test_parquet should be a Parquet Relation. " +
s"However, $other is returned form the cache.")
}
}
sql("DROP TABLE IF EXISTS test_insert_parquet")
sql("DROP TABLE IF EXISTS test_parquet_partitioned_cache_test")
sql(
"""
|create table test_insert_parquet
|(
| intField INT,
| stringField STRING
|)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)
var tableIdentifer = catalog.QualifiedTableName("default", "test_insert_parquet")
// First, make sure the converted test_parquet is not cached.
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
// Table lookup will make the table cached.
table("test_insert_parquet")
checkCached(tableIdentifer)
// For insert into non-partitioned table, we will do the conversion,
// so the converted test_insert_parquet should be cached.
invalidateTable("test_insert_parquet")
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
sql(
"""
|INSERT INTO TABLE test_insert_parquet
|select a, b from jt
""".stripMargin)
checkCached(tableIdentifer)
// Make sure we can read the data.
checkAnswer(
sql("select * from test_insert_parquet"),
sql("select a, b from jt").collect())
// Invalidate the cache.
invalidateTable("test_insert_parquet")
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
// Create a partitioned table.
sql(
"""
|create table test_parquet_partitioned_cache_test
|(
| intField INT,
| stringField STRING
|)
|PARTITIONED BY (date string)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)
tableIdentifer = catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test")
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
sql(
"""
|INSERT INTO TABLE test_parquet_partitioned_cache_test
|PARTITION (date='2015-04-01')
|select a, b from jt
""".stripMargin)
// Right now, insert into a partitioned Parquet is not supported in data source Parquet.
// So, we expect it is not cached.
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
sql(
"""
|INSERT INTO TABLE test_parquet_partitioned_cache_test
|PARTITION (date='2015-04-02')
|select a, b from jt
""".stripMargin)
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
// Make sure we can cache the partitioned table.
table("test_parquet_partitioned_cache_test")
checkCached(tableIdentifer)
// Make sure we can read the data.
checkAnswer(
sql("select STRINGField, date, intField from test_parquet_partitioned_cache_test"),
sql(
"""
|select b, '2015-04-01', a FROM jt
|UNION ALL
|select b, '2015-04-02', a FROM jt
""".stripMargin).collect())
invalidateTable("test_parquet_partitioned_cache_test")
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
sql("DROP TABLE test_insert_parquet")
sql("DROP TABLE test_parquet_partitioned_cache_test")
}
}
class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment