diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala index 5d97558633146bbc590b15b3565d2208271b6b09..aea27bd4c4d7fcf6eaf9b6e3366fac17d4873f60 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala @@ -94,27 +94,48 @@ private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging { // Opaque object that uniquely identifies a shared cache user private type ClientId = Object + private val warnedAboutEviction = new AtomicBoolean(false) // we use a composite cache key in order to distinguish entries inserted by different clients - private val cache: Cache[(ClientId, Path), Array[FileStatus]] = CacheBuilder.newBuilder() - .weigher(new Weigher[(ClientId, Path), Array[FileStatus]] { + private val cache: Cache[(ClientId, Path), Array[FileStatus]] = { + // [[Weigher]].weigh returns Int so we could only cache objects < 2GB + // instead, the weight is divided by this factor (which is smaller + // than the size of one [[FileStatus]]). + // so it will support objects up to 64GB in size. + val weightScale = 32 + val weigher = new Weigher[(ClientId, Path), Array[FileStatus]] { override def weigh(key: (ClientId, Path), value: Array[FileStatus]): Int = { - (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)).toInt - }}) - .removalListener(new RemovalListener[(ClientId, Path), Array[FileStatus]]() { - override def onRemoval(removed: RemovalNotification[(ClientId, Path), Array[FileStatus]]) - : Unit = { + val estimate = (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)) / weightScale + if (estimate > Int.MaxValue) { + logWarning(s"Cached table partition metadata size is too big. Approximating to " + + s"${Int.MaxValue.toLong * weightScale}.") + Int.MaxValue + } else { + estimate.toInt + } + } + } + val removalListener = new RemovalListener[(ClientId, Path), Array[FileStatus]]() { + override def onRemoval( + removed: RemovalNotification[(ClientId, Path), + Array[FileStatus]]): Unit = { if (removed.getCause == RemovalCause.SIZE && - warnedAboutEviction.compareAndSet(false, true)) { + warnedAboutEviction.compareAndSet(false, true)) { logWarning( "Evicting cached table partition metadata from memory due to size constraints " + - "(spark.sql.hive.filesourcePartitionFileCacheSize = " + maxSizeInBytes + " bytes). " + - "This may impact query planning performance.") + "(spark.sql.hive.filesourcePartitionFileCacheSize = " + + maxSizeInBytes + " bytes). This may impact query planning performance.") } - }}) - .maximumWeight(maxSizeInBytes) - .build[(ClientId, Path), Array[FileStatus]]() + } + } + CacheBuilder.newBuilder() + .weigher(weigher) + .removalListener(removalListener) + .maximumWeight(maxSizeInBytes / weightScale) + .build[(ClientId, Path), Array[FileStatus]]() + } + /** * @return a FileStatusCache that does not share any entries with any other client, but does diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index c638f5f7d306691b12f3a418f4aca9d4bf5bf96c..b4b762b1dd5981d6f81caf18070f3b4f3415a701 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.util.{KnownSizeEstimation, SizeEstimator} class FileIndexSuite extends SharedSQLContext { @@ -203,6 +204,21 @@ class FileIndexSuite extends SharedSQLContext { assert(catalog.leafDirPaths.head == fs.makeQualified(dirPath)) } } + + test("SPARK-20280 - FileStatusCache with a partition with very many files") { + /* fake the size, otherwise we need to allocate 2GB of data to trigger this bug */ + class MyFileStatus extends FileStatus with KnownSizeEstimation { + override def estimatedSize: Long = 1000 * 1000 * 1000 + } + /* files * MyFileStatus.estimatedSize should overflow to negative integer + * so, make it between 2bn and 4bn + */ + val files = (1 to 3).map { i => + new MyFileStatus() + } + val fileStatusCache = FileStatusCache.getOrCreate(spark) + fileStatusCache.putLeafFiles(new Path("/tmp", "abc"), files.toArray) + } } class FakeParentPathFileSystem extends RawLocalFileSystem {