Skip to content
Snippets Groups Projects
Commit 9eb74c7d authored by Matei Zaharia's avatar Matei Zaharia Committed by Michael Armbrust
Browse files

[SPARK-3091] [SQL] Add support for caching metadata on Parquet files

For larger Parquet files, reading the file footers (which is done in parallel on up to 5 threads) and HDFS block locations (which is serial) can take multiple seconds. We can add an option to cache this data within FilteringParquetInputFormat. Unfortunately ParquetInputFormat only caches footers within each instance of ParquetInputFormat, not across them.

Note: this PR leaves this turned off by default for 1.1, but I believe it's safe to turn it on after. The keys in the hash maps are FileStatus objects that include a modification time, so this will work fine if files are modified. The location cache could become invalid if files have moved within HDFS, but that's rare so I just made it invalidate entries every 15 minutes.

Author: Matei Zaharia <matei@databricks.com>

Closes #2005 from mateiz/parquet-cache and squashes the following commits:

dae8efe [Matei Zaharia] Bug fix
c71e9ed [Matei Zaharia] Handle empty statuses directly
22072b0 [Matei Zaharia] Use Guava caches and add a config option for caching metadata
8fb56ce [Matei Zaharia] Cache file block locations too
453bd21 [Matei Zaharia] Bug fix
4094df6 [Matei Zaharia] First attempt at caching Parquet footers
parent 6bca8898
No related branches found
No related tags found
No related merge requests found
......@@ -32,6 +32,7 @@ private[spark] object SQLConf {
val CODEGEN_ENABLED = "spark.sql.codegen"
val DIALECT = "spark.sql.dialect"
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
// This is only used for the thriftserver
val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
......
......@@ -17,22 +17,23 @@
package org.apache.spark.sql.parquet
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.util.Try
import java.io.IOException
import java.lang.{Long => JLong}
import java.text.SimpleDateFormat
import java.util.{Date, List => JList}
import java.util.concurrent.{Callable, TimeUnit}
import java.util.{ArrayList, Collections, Date, List => JList}
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.util.Try
import com.google.common.cache.CacheBuilder
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import parquet.hadoop._
import parquet.hadoop.api.{InitContext, ReadSupport}
import parquet.hadoop.metadata.GlobalMetaData
......@@ -41,7 +42,7 @@ import parquet.io.ParquetDecodingException
import parquet.schema.MessageType
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
import org.apache.spark.{Logging, SerializableWritable, TaskContext}
......@@ -96,6 +97,11 @@ case class ParquetTableScan(
ParquetFilters.serializeFilterExpressions(columnPruningPred, conf)
}
// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
conf.set(
SQLConf.PARQUET_CACHE_METADATA,
sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "false"))
sc.newAPIHadoopRDD(
conf,
classOf[FilteringParquetRowInputFormat],
......@@ -323,10 +329,40 @@ private[parquet] class FilteringParquetRowInputFormat
}
override def getFooters(jobContext: JobContext): JList[Footer] = {
import FilteringParquetRowInputFormat.footerCache
if (footers eq null) {
val conf = ContextUtil.getConfiguration(jobContext)
val cacheMetadata = conf.getBoolean(SQLConf.PARQUET_CACHE_METADATA, false)
val statuses = listStatus(jobContext)
fileStatuses = statuses.map(file => file.getPath -> file).toMap
footers = getFooters(ContextUtil.getConfiguration(jobContext), statuses)
if (statuses.isEmpty) {
footers = Collections.emptyList[Footer]
} else if (!cacheMetadata) {
// Read the footers from HDFS
footers = getFooters(conf, statuses)
} else {
// Read only the footers that are not in the footerCache
val foundFooters = footerCache.getAllPresent(statuses)
val toFetch = new ArrayList[FileStatus]
for (s <- statuses) {
if (!foundFooters.containsKey(s)) {
toFetch.add(s)
}
}
val newFooters = new mutable.HashMap[FileStatus, Footer]
if (toFetch.size > 0) {
val fetched = getFooters(conf, toFetch)
for ((status, i) <- toFetch.zipWithIndex) {
newFooters(status) = fetched.get(i)
}
footerCache.putAll(newFooters)
}
footers = new ArrayList[Footer](statuses.size)
for (status <- statuses) {
footers.add(newFooters.getOrElse(status, foundFooters.get(status)))
}
}
}
footers
......@@ -339,6 +375,10 @@ private[parquet] class FilteringParquetRowInputFormat
configuration: Configuration,
footers: JList[Footer]): JList[ParquetInputSplit] = {
import FilteringParquetRowInputFormat.blockLocationCache
val cacheMetadata = configuration.getBoolean(SQLConf.PARQUET_CACHE_METADATA, false)
val maxSplitSize: JLong = configuration.getLong("mapred.max.split.size", Long.MaxValue)
val minSplitSize: JLong =
Math.max(getFormatMinSplitSize(), configuration.getLong("mapred.min.split.size", 0L))
......@@ -366,16 +406,23 @@ private[parquet] class FilteringParquetRowInputFormat
for (footer <- footers) {
val fs = footer.getFile.getFileSystem(configuration)
val file = footer.getFile
val fileStatus = fileStatuses.getOrElse(file, fs.getFileStatus(file))
val status = fileStatuses.getOrElse(file, fs.getFileStatus(file))
val parquetMetaData = footer.getParquetMetadata
val blocks = parquetMetaData.getBlocks
val fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen)
var blockLocations: Array[BlockLocation] = null
if (!cacheMetadata) {
blockLocations = fs.getFileBlockLocations(status, 0, status.getLen)
} else {
blockLocations = blockLocationCache.get(status, new Callable[Array[BlockLocation]] {
def call(): Array[BlockLocation] = fs.getFileBlockLocations(status, 0, status.getLen)
})
}
splits.addAll(
generateSplits.invoke(
null,
blocks,
fileBlockLocations,
fileStatus,
blockLocations,
status,
parquetMetaData.getFileMetaData,
readContext.getRequestedSchema.toString,
readContext.getReadSupportMetadata,
......@@ -387,6 +434,17 @@ private[parquet] class FilteringParquetRowInputFormat
}
}
private[parquet] object FilteringParquetRowInputFormat {
private val footerCache = CacheBuilder.newBuilder()
.maximumSize(20000)
.build[FileStatus, Footer]()
private val blockLocationCache = CacheBuilder.newBuilder()
.maximumSize(20000)
.expireAfterWrite(15, TimeUnit.MINUTES) // Expire locations since HDFS files might move
.build[FileStatus, Array[BlockLocation]]()
}
private[parquet] object FileSystemHelper {
def listFiles(pathStr: String, conf: Configuration): Seq[Path] = {
val origPath = new Path(pathStr)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment