Skip to content
Snippets Groups Projects
Commit a8a765b3 authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-20151][SQL] Account for partition pruning in scan metadataTime metrics

## What changes were proposed in this pull request?
After SPARK-20136, we report metadata timing metrics in scan operator. However, that timing metric doesn't include one of the most important part of metadata, which is partition pruning. This patch adds that time measurement to the scan metrics.

## How was this patch tested?
N/A - I tried adding a test in SQLMetricsSuite but it was extremely convoluted to the point that I'm not sure if this is worth it.

Author: Reynold Xin <rxin@databricks.com>

Closes #17476 from rxin/SPARK-20151.
parent c734fc50
No related branches found
No related tags found
No related merge requests found
......@@ -172,12 +172,13 @@ case class FileSourceScanExec(
}
@transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
val startTime = System.nanoTime()
val ret = relation.location.listFiles(partitionFilters, dataFilters)
val timeTaken = (System.nanoTime() - startTime) / 1000 / 1000
val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000
metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
metrics("metadataTime").add(timeTaken)
metrics("metadataTime").add(timeTakenMs)
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
......
......@@ -69,6 +69,7 @@ class CatalogFileIndex(
*/
def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = {
if (table.partitionColumnNames.nonEmpty) {
val startTime = System.nanoTime()
val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
table.identifier, filters)
val partitions = selectedPartitions.map { p =>
......@@ -79,8 +80,9 @@ class CatalogFileIndex(
path.makeQualified(fs.getUri, fs.getWorkingDirectory))
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
val timeNs = System.nanoTime() - startTime
new PrunedInMemoryFileIndex(
sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec)
sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec, Option(timeNs))
} else {
new InMemoryFileIndex(
sparkSession, rootPaths, table.storage.properties, partitionSchema = None)
......@@ -111,7 +113,8 @@ private class PrunedInMemoryFileIndex(
sparkSession: SparkSession,
tableBasePath: Path,
fileStatusCache: FileStatusCache,
override val partitionSpec: PartitionSpec)
override val partitionSpec: PartitionSpec,
override val metadataOpsTimeNs: Option[Long])
extends InMemoryFileIndex(
sparkSession,
partitionSpec.partitions.map(_.path),
......
......@@ -72,4 +72,14 @@ trait FileIndex {
/** Schema of the partitioning columns, or the empty schema if the table is not partitioned. */
def partitionSchema: StructType
/**
* Returns an optional metadata operation time, in nanoseconds, for listing files.
*
* We do file listing in query optimization (in order to get the proper statistics) and we want
* to account for file listing time in physical execution (as metrics). To do that, we save the
* file listing time in some implementations and physical execution calls it in this method
* to update the metrics.
*/
def metadataOpsTimeNs: Option[Long] = None
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment