Skip to content
Snippets Groups Projects
Commit def3690f authored by Reynold Xin's avatar Reynold Xin
Browse files

[SQL] Minor readability improvement for partition handling code


This patch includes minor changes to improve readability for partition handling code. I'm in the middle of implementing some new feature and found some naming / implicit type inference not as intuitive.

This patch should have no semantic change and the changes should be covered by existing test cases.

Author: Reynold Xin <rxin@databricks.com>

Closes #16378 from rxin/minor-fix.

(cherry picked from commit 7c5b7b3a)
Signed-off-by: default avatarReynold Xin <rxin@databricks.com>
parent 07e2a17d
No related branches found
No related tags found
No related merge requests found
......@@ -136,7 +136,7 @@ case class RowDataSourceScanExec(
* @param outputSchema Output schema of the scan.
* @param partitionFilters Predicates to use for partition pruning.
* @param dataFilters Data source filters to use for filtering data within partitions.
* @param metastoreTableIdentifier
* @param metastoreTableIdentifier identifier for the table in the metastore.
*/
case class FileSourceScanExec(
@transient relation: HadoopFsRelation,
......@@ -147,10 +147,10 @@ case class FileSourceScanExec(
override val metastoreTableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec {
val supportsBatch = relation.fileFormat.supportBatch(
val supportsBatch: Boolean = relation.fileFormat.supportBatch(
relation.sparkSession, StructType.fromAttributes(output))
val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
val needsUnsafeRowConversion: Boolean = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
} else {
false
......@@ -516,7 +516,6 @@ case class FileSourceScanExec(
}
// Assign files to partitions using "First Fit Decreasing" (FFD)
// TODO: consider adding a slop factor here?
splitFiles.foreach { file =>
if (currentSize + file.length > maxSplitBytes) {
closePartition()
......
......@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.datasources
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
......@@ -37,14 +38,15 @@ class CatalogFileIndex(
val table: CatalogTable,
override val sizeInBytes: Long) extends FileIndex {
protected val hadoopConf = sparkSession.sessionState.newHadoopConf
protected val hadoopConf: Configuration = sparkSession.sessionState.newHadoopConf()
private val fileStatusCache = FileStatusCache.newCache(sparkSession)
/** Globally shared (not exclusive to this table) cache for file statuses to speed up listing. */
private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
assert(table.identifier.database.isDefined,
"The table identifier must be qualified in CatalogFileIndex")
private val baseLocation = table.storage.locationUri
private val baseLocation: Option[String] = table.storage.locationUri
override def partitionSchema: StructType = table.partitionSchema
......@@ -76,7 +78,8 @@ class CatalogFileIndex(
new PrunedInMemoryFileIndex(
sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec)
} else {
new InMemoryFileIndex(sparkSession, rootPaths, table.storage.properties, None)
new InMemoryFileIndex(
sparkSession, rootPaths, table.storage.properties, partitionSchema = None)
}
}
......
......@@ -148,7 +148,8 @@ trait FileFormat {
* The base class file format that is based on text file.
*/
abstract class TextBasedFileFormat extends FileFormat {
private var codecFactory: CompressionCodecFactory = null
private var codecFactory: CompressionCodecFactory = _
override def isSplitable(
sparkSession: SparkSession,
options: Map[String, String],
......
......@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.datasources
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.JavaConverters._
......@@ -26,9 +25,38 @@ import com.google.common.cache._
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.{SerializableConfiguration, SizeEstimator}
import org.apache.spark.util.SizeEstimator
/**
* Use [[FileStatusCache.getOrCreate()]] to construct a globally shared file status cache.
*/
object FileStatusCache {
private var sharedCache: SharedInMemoryCache = _
/**
* @return a new FileStatusCache based on session configuration. Cache memory quota is
* shared across all clients.
*/
def getOrCreate(session: SparkSession): FileStatusCache = synchronized {
if (session.sqlContext.conf.manageFilesourcePartitions &&
session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) {
if (sharedCache == null) {
sharedCache = new SharedInMemoryCache(
session.sqlContext.conf.filesourcePartitionFileCacheSize)
}
sharedCache.createForNewClient()
} else {
NoopCache
}
}
def resetForTesting(): Unit = synchronized {
sharedCache = null
}
}
/**
* A cache of the leaf files of partition directories. We cache these files in order to speed
......@@ -55,32 +83,6 @@ abstract class FileStatusCache {
def invalidateAll(): Unit
}
object FileStatusCache {
private var sharedCache: SharedInMemoryCache = null
/**
* @return a new FileStatusCache based on session configuration. Cache memory quota is
* shared across all clients.
*/
def newCache(session: SparkSession): FileStatusCache = {
synchronized {
if (session.sqlContext.conf.manageFilesourcePartitions &&
session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) {
if (sharedCache == null) {
sharedCache = new SharedInMemoryCache(
session.sqlContext.conf.filesourcePartitionFileCacheSize)
}
sharedCache.getForNewClient()
} else {
NoopCache
}
}
}
def resetForTesting(): Unit = synchronized {
sharedCache = null
}
}
/**
* An implementation that caches partition file statuses in memory.
......@@ -88,7 +90,6 @@ object FileStatusCache {
* @param maxSizeInBytes max allowable cache size before entries start getting evicted
*/
private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging {
import FileStatusCache._
// Opaque object that uniquely identifies a shared cache user
private type ClientId = Object
......@@ -102,8 +103,9 @@ private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging {
(SizeEstimator.estimate(key) + SizeEstimator.estimate(value)).toInt
}})
.removalListener(new RemovalListener[(ClientId, Path), Array[FileStatus]]() {
override def onRemoval(removed: RemovalNotification[(ClientId, Path), Array[FileStatus]]) = {
if (removed.getCause() == RemovalCause.SIZE &&
override def onRemoval(removed: RemovalNotification[(ClientId, Path), Array[FileStatus]])
: Unit = {
if (removed.getCause == RemovalCause.SIZE &&
warnedAboutEviction.compareAndSet(false, true)) {
logWarning(
"Evicting cached table partition metadata from memory due to size constraints " +
......@@ -112,13 +114,13 @@ private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging {
}
}})
.maximumWeight(maxSizeInBytes)
.build()
.build[(ClientId, Path), Array[FileStatus]]()
/**
* @return a FileStatusCache that does not share any entries with any other client, but does
* share memory resources for the purpose of cache eviction.
*/
def getForNewClient(): FileStatusCache = new FileStatusCache {
def createForNewClient(): FileStatusCache = new FileStatusCache {
val clientId = new Object()
override def getLeafFiles(path: Path): Option[Array[FileStatus]] = {
......@@ -126,7 +128,7 @@ private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging {
}
override def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit = {
cache.put((clientId, path), leafFiles.toArray)
cache.put((clientId, path), leafFiles)
}
override def invalidateAll(): Unit = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment