Skip to content
Snippets Groups Projects
Commit 9812f7d5 authored by petermaxlee's avatar petermaxlee Committed by Shixiong Zhu
Browse files

[SPARK-17165][SQL] FileStreamSource should not track the list of seen files indefinitely

## What changes were proposed in this pull request?
Before this change, FileStreamSource uses an in-memory hash set to track the list of files processed by the engine. The list can grow indefinitely, leading to OOM or overflow of the hash set.

This patch introduces a new user-defined option called "maxFileAge", default to 24 hours. If a file is older than this age, FileStreamSource will purge it from the in-memory map that was used to track the list of files that have been processed.

## How was this patch tested?
Added unit tests for the underlying utility, and also added an end-to-end test to validate the purge in FileStreamSourceSuite. Also verified the new test cases would fail when the timeout was set to a very large number.

Author: petermaxlee <petermaxlee@gmail.com>

Closes #14728 from petermaxlee/SPARK-17165.
parent 261c55dd
No related branches found
No related tags found
No related merge requests found
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.streaming
import scala.util.Try
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
import org.apache.spark.util.Utils
/**
* User specified options for file streams.
*/
class FileStreamOptions(parameters: Map[String, String]) extends Logging {
val maxFilesPerTrigger: Option[Int] = parameters.get("maxFilesPerTrigger").map { str =>
Try(str.toInt).toOption.filter(_ > 0).getOrElse {
throw new IllegalArgumentException(
s"Invalid value '$str' for option 'maxFilesPerTrigger', must be a positive integer")
}
}
/**
* Maximum age of a file that can be found in this directory, before it is deleted.
*
* The max age is specified with respect to the timestamp of the latest file, and not the
* timestamp of the current system. That this means if the last file has timestamp 1000, and the
* current system time is 2000, and max age is 200, the system will purge files older than
* 800 (rather than 1800) from the internal state.
*
* Default to a week.
*/
val maxFileAgeMs: Long =
Utils.timeStringAsMs(parameters.getOrElse("maxFileAge", "7d"))
/** Options as specified by the user, in a case-insensitive map, without "path" set. */
val optionMapWithoutPath: Map[String, String] =
new CaseInsensitiveMap(parameters).filterKeys(_ != "path")
}
......@@ -17,21 +17,20 @@
package org.apache.spark.sql.execution.streaming
import scala.util.Try
import scala.collection.JavaConverters._
import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, DataSource, ListingFileCatalog, LogicalRelation}
import org.apache.spark.sql.execution.datasources.{DataSource, ListingFileCatalog, LogicalRelation}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.OpenHashSet
/**
* A very simple source that reads text files from the given directory as they appear.
* A very simple source that reads files from the given directory as they appear.
*
* TODO Clean up the metadata files periodically
* TODO: Clean up the metadata log files periodically.
*/
class FileStreamSource(
sparkSession: SparkSession,
......@@ -41,19 +40,34 @@ class FileStreamSource(
metadataPath: String,
options: Map[String, String]) extends Source with Logging {
private val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf())
private val qualifiedBasePath = fs.makeQualified(new Path(path)) // can contains glob patterns
private val metadataLog = new HDFSMetadataLog[Seq[String]](sparkSession, metadataPath)
import FileStreamSource._
private val sourceOptions = new FileStreamOptions(options)
private val qualifiedBasePath: Path = {
val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf())
fs.makeQualified(new Path(path)) // can contains glob patterns
}
private val metadataLog = new HDFSMetadataLog[Seq[FileEntry]](sparkSession, metadataPath)
private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
/** Maximum number of new files to be considered in each batch */
private val maxFilesPerBatch = getMaxFilesPerBatch()
private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger
/** A mapping from a file that we have processed to some timestamp it was last modified. */
// Visible for testing and debugging in production.
val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs)
private val seenFiles = new OpenHashSet[String]
metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, files) =>
files.foreach(seenFiles.add)
metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, entry) =>
entry.foreach(seenFiles.add)
// TODO: move purge call out of the loop once we truncate logs.
seenFiles.purge()
}
logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = ${sourceOptions.maxFileAgeMs}")
/**
* Returns the maximum offset that can be retrieved from the source.
*
......@@ -61,16 +75,27 @@ class FileStreamSource(
* there is no race here, so the cost of `synchronized` should be rare.
*/
private def fetchMaxOffset(): LongOffset = synchronized {
val newFiles = fetchAllFiles().filter(!seenFiles.contains(_))
// All the new files found - ignore aged files and files that we have seen.
val newFiles = fetchAllFiles().filter(seenFiles.isNewFile)
// Obey user's setting to limit the number of files in this batch trigger.
val batchFiles =
if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else newFiles
batchFiles.foreach { file =>
seenFiles.add(file)
logDebug(s"New file: $file")
}
logTrace(s"Number of new files = ${newFiles.size})")
logTrace(s"Number of files selected for batch = ${batchFiles.size}")
logTrace(s"Number of seen files = ${seenFiles.size}")
val numPurged = seenFiles.purge()
logTrace(
s"""
|Number of new files = ${newFiles.size}
|Number of files selected for batch = ${batchFiles.size}
|Number of seen files = ${seenFiles.size}
|Number of files purged from tracking map = $numPurged
""".stripMargin)
if (batchFiles.nonEmpty) {
maxBatchId += 1
metadataLog.add(maxBatchId, batchFiles)
......@@ -104,22 +129,26 @@ class FileStreamSource(
val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)
logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
val newOptions = new CaseInsensitiveMap(options).filterKeys(_ != "path")
val newDataSource =
DataSource(
sparkSession,
paths = files,
paths = files.map(_.path),
userSpecifiedSchema = Some(schema),
className = fileFormatClassName,
options = newOptions)
options = sourceOptions.optionMapWithoutPath)
Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))
}
private def fetchAllFiles(): Seq[String] = {
/**
* Returns a list of files found, sorted by their timestamp.
*/
private def fetchAllFiles(): Seq[FileEntry] = {
val startTime = System.nanoTime
val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType))
val files = catalog.allFiles().sortBy(_.getModificationTime).map(_.getPath.toUri.toString)
val files = catalog.allFiles().sortBy(_.getModificationTime).map { status =>
FileEntry(status.getPath.toUri.toString, status.getModificationTime)
}
val endTime = System.nanoTime
val listingTimeMs = (endTime.toDouble - startTime) / 1000000
if (listingTimeMs > 2000) {
......@@ -132,20 +161,76 @@ class FileStreamSource(
files
}
private def getMaxFilesPerBatch(): Option[Int] = {
new CaseInsensitiveMap(options)
.get("maxFilesPerTrigger")
.map { str =>
Try(str.toInt).toOption.filter(_ > 0).getOrElse {
throw new IllegalArgumentException(
s"Invalid value '$str' for option 'maxFilesPerTrigger', must be a positive integer")
}
}
}
override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1)
override def toString: String = s"FileStreamSource[$qualifiedBasePath]"
override def stop() {}
}
object FileStreamSource {
/** Timestamp for file modification time, in ms since January 1, 1970 UTC. */
type Timestamp = Long
case class FileEntry(path: String, timestamp: Timestamp) extends Serializable
/**
* A custom hash map used to track the list of files seen. This map is not thread-safe.
*
* To prevent the hash map from growing indefinitely, a purge function is available to
* remove files "maxAgeMs" older than the latest file.
*/
class SeenFilesMap(maxAgeMs: Long) {
require(maxAgeMs >= 0)
/** Mapping from file to its timestamp. */
private val map = new java.util.HashMap[String, Timestamp]
/** Timestamp of the latest file. */
private var latestTimestamp: Timestamp = 0L
/** Timestamp for the last purge operation. */
private var lastPurgeTimestamp: Timestamp = 0L
/** Add a new file to the map. */
def add(file: FileEntry): Unit = {
map.put(file.path, file.timestamp)
if (file.timestamp > latestTimestamp) {
latestTimestamp = file.timestamp
}
}
/**
* Returns true if we should consider this file a new file. The file is only considered "new"
* if it is new enough that we are still tracking, and we have not seen it before.
*/
def isNewFile(file: FileEntry): Boolean = {
// Note that we are testing against lastPurgeTimestamp here so we'd never miss a file that
// is older than (latestTimestamp - maxAgeMs) but has not been purged yet.
file.timestamp >= lastPurgeTimestamp && !map.containsKey(file.path)
}
/** Removes aged entries and returns the number of files removed. */
def purge(): Int = {
lastPurgeTimestamp = latestTimestamp - maxAgeMs
val iter = map.entrySet().iterator()
var count = 0
while (iter.hasNext) {
val entry = iter.next()
if (entry.getValue < lastPurgeTimestamp) {
count += 1
iter.remove()
}
}
count
}
def size: Int = map.size()
def allEntries: Seq[FileEntry] = {
map.entrySet().asScala.map(entry => FileEntry(entry.getKey, entry.getValue)).toSeq
}
}
}
......@@ -180,7 +180,7 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
private def isFileAlreadyExistsException(e: IOException): Boolean = {
e.isInstanceOf[FileAlreadyExistsException] ||
// Old Hadoop versions don't throw FileAlreadyExistsException. Although it's fixed in
// HADOOP-9361, we still need to support old Hadoop versions.
// HADOOP-9361 in Hadoop 2.5, we still need to support old Hadoop versions.
(e.getMessage != null && e.getMessage.startsWith("File already exists: "))
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.streaming
import org.apache.spark.SparkFunSuite
class FileStreamSourceSuite extends SparkFunSuite {
import FileStreamSource._
test("SeenFilesMap") {
val map = new SeenFilesMap(maxAgeMs = 10)
map.add(FileEntry("a", 5))
assert(map.size == 1)
map.purge()
assert(map.size == 1)
// Add a new entry and purge should be no-op, since the gap is exactly 10 ms.
map.add(FileEntry("b", 15))
assert(map.size == 2)
map.purge()
assert(map.size == 2)
// Add a new entry that's more than 10 ms than the first entry. We should be able to purge now.
map.add(FileEntry("c", 16))
assert(map.size == 3)
map.purge()
assert(map.size == 2)
// Override existing entry shouldn't change the size
map.add(FileEntry("c", 25))
assert(map.size == 2)
// Not a new file because we have seen c before
assert(!map.isNewFile(FileEntry("c", 20)))
// Not a new file because timestamp is too old
assert(!map.isNewFile(FileEntry("d", 5)))
// Finally a new file: never seen and not too old
assert(map.isNewFile(FileEntry("e", 20)))
}
test("SeenFilesMap should only consider a file old if it is earlier than last purge time") {
val map = new SeenFilesMap(maxAgeMs = 10)
map.add(FileEntry("a", 20))
assert(map.size == 1)
// Timestamp 5 should still considered a new file because purge time should be 0
assert(map.isNewFile(FileEntry("b", 9)))
assert(map.isNewFile(FileEntry("b", 10)))
// Once purge, purge time should be 10 and then b would be a old file if it is less than 10.
map.purge()
assert(!map.isNewFile(FileEntry("b", 9)))
assert(map.isNewFile(FileEntry("b", 10)))
}
}
......@@ -104,12 +104,13 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
def createFileStream(
format: String,
path: String,
schema: Option[StructType] = None): DataFrame = {
schema: Option[StructType] = None,
options: Map[String, String] = Map.empty): DataFrame = {
val reader =
if (schema.isDefined) {
spark.readStream.format(format).schema(schema.get)
spark.readStream.format(format).schema(schema.get).options(options)
} else {
spark.readStream.format(format)
spark.readStream.format(format).options(options)
}
reader.load(path)
}
......@@ -331,6 +332,39 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
test("SPARK-17165 should not track the list of seen files indefinitely") {
// This test works by:
// 1. Create a file
// 2. Get it processed
// 3. Sleeps for a very short amount of time (larger than maxFileAge
// 4. Add another file (at this point the original file should have been purged
// 5. Test the size of the seenFiles internal data structure
// Note that if we change maxFileAge to a very large number, the last step should fail.
withTempDirs { case (src, tmp) =>
val textStream: DataFrame =
createFileStream("text", src.getCanonicalPath, options = Map("maxFileAge" -> "5ms"))
testStream(textStream)(
AddTextFileData("a\nb", src, tmp),
CheckAnswer("a", "b"),
// SLeeps longer than 5ms (maxFileAge)
AssertOnQuery { _ => Thread.sleep(10); true },
AddTextFileData("c\nd", src, tmp),
CheckAnswer("a", "b", "c", "d"),
AssertOnQuery("seen files should contain only one entry") { streamExecution =>
val source = streamExecution.logicalPlan.collect { case e: StreamingExecutionRelation =>
e.source.asInstanceOf[FileStreamSource]
}.head
source.seenFiles.size == 1
}
)
}
}
// =============== JSON file stream tests ================
test("read from json files") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment