Skip to content
Snippets Groups Projects
Commit f643fe47 authored by Tyson Condie's avatar Tyson Condie Committed by Michael Armbrust
Browse files

[SPARK-18498][SQL] Revise HDFSMetadataLog API for better testing


Revise HDFSMetadataLog API such that metadata object serialization and final batch file write are separated. This will allow serialization checks without worrying about batch file name formats. marmbrus zsxwing

Existing tests already ensure this API faithfully support core functionality i.e., creation of batch files.

Author: Tyson Condie <tcondie@gmail.com>

Closes #15924 from tcondie/SPARK-18498.

Signed-off-by: default avatarMichael Armbrust <michael@databricks.com>
parent 95f79850
No related branches found
No related tags found
No related merge requests found
......@@ -138,14 +138,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
}
}
/**
* Write a batch to a temp file then rename it to the batch file.
*
* There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
* valid behavior, we still need to prevent it from destroying the files.
*/
private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = {
// Use nextId to create a temp file
def writeTempBatch(metadata: T, writer: (T, OutputStream) => Unit = serialize): Option[Path] = {
var nextId = 0
while (true) {
val tempPath = new Path(metadataPath, s".${UUID.randomUUID.toString}.tmp")
......@@ -153,33 +146,10 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
val output = fileManager.create(tempPath)
try {
writer(metadata, output)
return Some(tempPath)
} finally {
IOUtils.closeQuietly(output)
}
try {
// Try to commit the batch
// It will fail if there is an existing file (someone has committed the batch)
logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
fileManager.rename(tempPath, batchIdToPath(batchId))
// SPARK-17475: HDFSMetadataLog should not leak CRC files
// If the underlying filesystem didn't rename the CRC file, delete it.
val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc")
if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
return
} catch {
case e: IOException if isFileAlreadyExistsException(e) =>
// If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch.
// So throw an exception to tell the user this is not a valid behavior.
throw new ConcurrentModificationException(
s"Multiple HDFSMetadataLog are using $path", e)
case e: FileNotFoundException =>
// Sometimes, "create" will succeed when multiple writers are calling it at the same
// time. However, only one writer can call "rename" successfully, others will get
// FileNotFoundException because the first writer has removed it.
throw new ConcurrentModificationException(
s"Multiple HDFSMetadataLog are using $path", e)
}
} catch {
case e: IOException if isFileAlreadyExistsException(e) =>
// Failed to create "tempPath". There are two cases:
......@@ -195,10 +165,45 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
// metadata path. In addition, the old Streaming also have this issue, people can create
// malicious checkpoint files to crash a Streaming application too.
nextId += 1
} finally {
fileManager.delete(tempPath)
}
}
None
}
/**
* Write a batch to a temp file then rename it to the batch file.
*
* There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
* valid behavior, we still need to prevent it from destroying the files.
*/
private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = {
val tempPath = writeTempBatch(metadata, writer).getOrElse(
throw new IllegalStateException(s"Unable to create temp batch file $batchId"))
try {
// Try to commit the batch
// It will fail if there is an existing file (someone has committed the batch)
logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
fileManager.rename(tempPath, batchIdToPath(batchId))
// SPARK-17475: HDFSMetadataLog should not leak CRC files
// If the underlying filesystem didn't rename the CRC file, delete it.
val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc")
if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
} catch {
case e: IOException if isFileAlreadyExistsException(e) =>
// If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch.
// So throw an exception to tell the user this is not a valid behavior.
throw new ConcurrentModificationException(
s"Multiple HDFSMetadataLog are using $path", e)
case e: FileNotFoundException =>
// Sometimes, "create" will succeed when multiple writers are calling it at the same
// time. However, only one writer can call "rename" successfully, others will get
// FileNotFoundException because the first writer has removed it.
throw new ConcurrentModificationException(
s"Multiple HDFSMetadataLog are using $path", e)
} finally {
fileManager.delete(tempPath)
}
}
private def isFileAlreadyExistsException(e: IOException): Boolean = {
......@@ -208,6 +213,22 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
(e.getMessage != null && e.getMessage.startsWith("File already exists: "))
}
/**
* @return the deserialized metadata in a batch file, or None if file not exist.
* @throws IllegalArgumentException when path does not point to a batch file.
*/
def get(batchFile: Path): Option[T] = {
if (fileManager.exists(batchFile)) {
if (isBatchFile(batchFile)) {
get(pathToBatchId(batchFile))
} else {
throw new IllegalArgumentException(s"File ${batchFile} is not a batch file!")
}
} else {
None
}
}
override def get(batchId: Long): Option[T] = {
val batchMetadataFile = batchIdToPath(batchId)
if (fileManager.exists(batchMetadataFile)) {
......@@ -250,6 +271,17 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
None
}
/**
* Get an array of [FileStatus] referencing batch files.
* The array is sorted by most recent batch file first to
* oldest batch file.
*/
def getOrderedBatchFiles(): Array[FileStatus] = {
fileManager.list(metadataPath, batchFilesFilter)
.sortBy(f => pathToBatchId(f.getPath))
.reverse
}
/**
* Removes all the log entry earlier than thresholdBatchId (exclusive).
*/
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment