Skip to content
Snippets Groups Projects
Commit b5e3bd87 authored by Shixiong Zhu's avatar Shixiong Zhu Committed by Michael Armbrust
Browse files

[SPARK-13791][SQL] Add MetadataLog and HDFSMetadataLog

## What changes were proposed in this pull request?

- Add a MetadataLog interface for  metadata reliably storage.
- Add HDFSMetadataLog as a MetadataLog implementation based on HDFS.
- Update FileStreamSource to use HDFSMetadataLog instead of managing metadata by itself.

## How was this patch tested?

unit tests

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #11625 from zsxwing/metadata-log.
parent 8e0b0306
No related branches found
No related tags found
No related merge requests found
......@@ -17,13 +17,9 @@
package org.apache.spark.sql.execution.streaming
import java.io._
import java.nio.charset.StandardCharsets
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.io.Codec
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.Logging
import org.apache.spark.sql.{DataFrame, SQLContext}
......@@ -44,33 +40,12 @@ class FileStreamSource(
dataFrameBuilder: Array[String] => DataFrame) extends Source with Logging {
private val fs = FileSystem.get(sqlContext.sparkContext.hadoopConfiguration)
private var maxBatchId = -1
private val seenFiles = new OpenHashSet[String]
private val metadataLog = new HDFSMetadataLog[Seq[String]](sqlContext, metadataPath)
private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
/** Map of batch id to files. This map is also stored in `metadataPath`. */
private val batchToMetadata = new HashMap[Long, Seq[String]]
{
// Restore file paths from the metadata files
val existingBatchFiles = fetchAllBatchFiles()
if (existingBatchFiles.nonEmpty) {
val existingBatchIds = existingBatchFiles.map(_.getPath.getName.toInt)
maxBatchId = existingBatchIds.max
// Recover "batchToMetadata" and "seenFiles" from existing metadata files.
existingBatchIds.sorted.foreach { batchId =>
val files = readBatch(batchId)
if (files.isEmpty) {
// Assert that the corrupted file must be the latest metadata file.
if (batchId != maxBatchId) {
throw new IllegalStateException("Invalid metadata files")
}
maxBatchId = maxBatchId - 1
} else {
batchToMetadata(batchId) = files
files.foreach(seenFiles.add)
}
}
}
private val seenFiles = new OpenHashSet[String]
metadataLog.get(None, maxBatchId).foreach { case (batchId, files) =>
files.foreach(seenFiles.add)
}
/** Returns the schema of the data from this source */
......@@ -112,7 +87,7 @@ class FileStreamSource(
if (newFiles.nonEmpty) {
maxBatchId += 1
writeBatch(maxBatchId, newFiles)
metadataLog.add(maxBatchId, newFiles)
}
new LongOffset(maxBatchId)
......@@ -140,9 +115,7 @@ class FileStreamSource(
val endId = end.offset
if (startId + 1 <= endId) {
val files = (startId + 1 to endId).filter(_ >= 0).flatMap { batchId =>
batchToMetadata.getOrElse(batchId, Nil)
}.toArray
val files = metadataLog.get(Some(startId + 1), endId).map(_._2).flatten
logDebug(s"Return files from batches ${startId + 1}:$endId")
logDebug(s"Streaming ${files.mkString(", ")}")
Some(new Batch(end, dataFrameBuilder(files)))
......@@ -152,89 +125,9 @@ class FileStreamSource(
}
}
private def fetchAllBatchFiles(): Seq[FileStatus] = {
try fs.listStatus(new Path(metadataPath)) catch {
case _: java.io.FileNotFoundException =>
fs.mkdirs(new Path(metadataPath))
Seq.empty
}
}
private def fetchAllFiles(): Seq[String] = {
fs.listStatus(new Path(path))
.filterNot(_.getPath.getName.startsWith("_"))
.map(_.getPath.toUri.toString)
}
/**
* Write the metadata of a batch to disk. The file format is as follows:
*
* {{{
* <FileStreamSource.VERSION>
* START
* -/a/b/c
* -/d/e/f
* ...
* END
* }}}
*
* Note: <FileStreamSource.VERSION> means the value of `FileStreamSource.VERSION`. Every file
* path starts with "-" so that we can know if a line is a file path easily.
*/
private def writeBatch(id: Int, files: Seq[String]): Unit = {
assert(files.nonEmpty, "create a new batch without any file")
val output = fs.create(new Path(metadataPath + "/" + id), true)
val writer = new PrintWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8))
try {
// scalastyle:off println
writer.println(FileStreamSource.VERSION)
writer.println(FileStreamSource.START_TAG)
files.foreach(file => writer.println(FileStreamSource.PATH_PREFIX + file))
writer.println(FileStreamSource.END_TAG)
// scalastyle:on println
} finally {
writer.close()
}
batchToMetadata(id) = files
}
/** Read the file names of the specified batch id from the metadata file */
private def readBatch(id: Int): Seq[String] = {
val input = fs.open(new Path(metadataPath + "/" + id))
try {
FileStreamSource.readBatch(input)
} finally {
input.close()
}
}
}
object FileStreamSource {
private val START_TAG = "START"
private val END_TAG = "END"
private val PATH_PREFIX = "-"
val VERSION = "FILESTREAM_V1"
/**
* Parse a metadata file and return the content. If the metadata file is corrupted, it will return
* an empty `Seq`.
*/
def readBatch(input: InputStream): Seq[String] = {
val lines = scala.io.Source.fromInputStream(input)(Codec.UTF8).getLines().toArray
if (lines.length < 4) {
// version + start tag + end tag + at least one file path
return Nil
}
if (lines.head != VERSION) {
return Nil
}
if (lines(1) != START_TAG) {
return Nil
}
if (lines.last != END_TAG) {
return Nil
}
lines.slice(2, lines.length - 1).map(_.stripPrefix(PATH_PREFIX)) // Drop character "-"
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.streaming
import java.io.{FileNotFoundException, IOException}
import java.nio.ByteBuffer
import java.util.{ConcurrentModificationException, EnumSet}
import scala.reflect.ClassTag
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.sql.SQLContext
/**
* A [[MetadataLog]] implementation based on HDFS. [[HDFSMetadataLog]] uses the specified `path`
* as the metadata storage.
*
* When writing a new batch, [[HDFSMetadataLog]] will firstly write to a temp file and then rename
* it to the final batch file. If the rename step fails, there must be multiple writers and only
* one of them will succeed and the others will fail.
*
* Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they don't guarantee listing
* files in a directory always shows the latest files.
*/
class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) extends MetadataLog[T] {
private val metadataPath = new Path(path)
private val fc =
if (metadataPath.toUri.getScheme == null) {
FileContext.getFileContext(sqlContext.sparkContext.hadoopConfiguration)
} else {
FileContext.getFileContext(metadataPath.toUri, sqlContext.sparkContext.hadoopConfiguration)
}
if (!fc.util().exists(metadataPath)) {
fc.mkdir(metadataPath, FsPermission.getDirDefault, true)
}
/**
* A `PathFilter` to filter only batch files
*/
private val batchFilesFilter = new PathFilter {
override def accept(path: Path): Boolean = try {
path.getName.toLong
true
} catch {
case _: NumberFormatException => false
}
}
private val serializer = new JavaSerializer(sqlContext.sparkContext.conf).newInstance()
private def batchFile(batchId: Long): Path = {
new Path(metadataPath, batchId.toString)
}
override def add(batchId: Long, metadata: T): Boolean = {
get(batchId).map(_ => false).getOrElse {
// Only write metadata when the batch has not yet been written.
val buffer = serializer.serialize(metadata)
try {
writeBatch(batchId, JavaUtils.bufferToArray(buffer))
true
} catch {
case e: IOException if "java.lang.InterruptedException" == e.getMessage =>
// create may convert InterruptedException to IOException. Let's convert it back to
// InterruptedException so that this failure won't crash StreamExecution
throw new InterruptedException("Creating file is interrupted")
}
}
}
/**
* Write a batch to a temp file then rename it to the batch file.
*
* There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
* valid behavior, we still need to prevent it from destroying the files.
*/
private def writeBatch(batchId: Long, bytes: Array[Byte]): Unit = {
// Use nextId to create a temp file
var nextId = 0
while (true) {
val tempPath = new Path(metadataPath, s".${batchId}_$nextId.tmp")
fc.deleteOnExit(tempPath)
try {
val output = fc.create(tempPath, EnumSet.of(CreateFlag.CREATE))
try {
output.write(bytes)
} finally {
output.close()
}
try {
// Try to commit the batch
// It will fail if there is an existing file (someone has committed the batch)
fc.rename(tempPath, batchFile(batchId), Options.Rename.NONE)
return
} catch {
case e: IOException if isFileAlreadyExistsException(e) =>
// If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch.
// So throw an exception to tell the user this is not a valid behavior.
throw new ConcurrentModificationException(
s"Multiple HDFSMetadataLog are using $path", e)
case e: FileNotFoundException =>
// Sometimes, "create" will succeed when multiple writers are calling it at the same
// time. However, only one writer can call "rename" successfully, others will get
// FileNotFoundException because the first writer has removed it.
throw new ConcurrentModificationException(
s"Multiple HDFSMetadataLog are using $path", e)
}
} catch {
case e: IOException if isFileAlreadyExistsException(e) =>
// Failed to create "tempPath". There are two cases:
// 1. Someone is creating "tempPath" too.
// 2. This is a restart. "tempPath" has already been created but not moved to the final
// batch file (not committed).
//
// For both cases, the batch has not yet been committed. So we can retry it.
//
// Note: there is a potential risk here: if HDFSMetadataLog A is running, people can use
// the same metadata path to create "HDFSMetadataLog" and fail A. However, this is not a
// big problem because it requires the attacker must have the permission to write the
// metadata path. In addition, the old Streaming also have this issue, people can create
// malicious checkpoint files to crash a Streaming application too.
nextId += 1
}
}
}
private def isFileAlreadyExistsException(e: IOException): Boolean = {
e.isInstanceOf[FileAlreadyExistsException] ||
// Old Hadoop versions don't throw FileAlreadyExistsException. Although it's fixed in
// HADOOP-9361, we still need to support old Hadoop versions.
(e.getMessage != null && e.getMessage.startsWith("File already exists: "))
}
override def get(batchId: Long): Option[T] = {
val batchMetadataFile = batchFile(batchId)
if (fc.util().exists(batchMetadataFile)) {
val input = fc.open(batchMetadataFile)
val bytes = IOUtils.toByteArray(input)
Some(serializer.deserialize[T](ByteBuffer.wrap(bytes)))
} else {
None
}
}
override def get(startId: Option[Long], endId: Long): Array[(Long, T)] = {
val batchIds = fc.util().listStatus(metadataPath, batchFilesFilter)
.map(_.getPath.getName.toLong)
.filter { batchId =>
batchId <= endId && (startId.isEmpty || batchId >= startId.get)
}
batchIds.sorted.map(batchId => (batchId, get(batchId))).filter(_._2.isDefined).map {
case (batchId, metadataOption) =>
(batchId, metadataOption.get)
}
}
override def getLatest(): Option[(Long, T)] = {
val batchIds = fc.util().listStatus(metadataPath, batchFilesFilter)
.map(_.getPath.getName.toLong)
.sorted
.reverse
for (batchId <- batchIds) {
val batch = get(batchId)
if (batch.isDefined) {
return Some((batchId, batch.get))
}
}
None
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.streaming
/**
* A general MetadataLog that supports the following features:
*
* - Allow the user to store a metadata object for each batch.
* - Allow the user to query the latest batch id.
* - Allow the user to query the metadata object of a specified batch id.
* - Allow the user to query metadata objects in a range of batch ids.
*/
trait MetadataLog[T] {
/**
* Store the metadata for the specified batchId and return `true` if successful. If the batchId's
* metadata has already been stored, this method will return `false`.
*/
def add(batchId: Long, metadata: T): Boolean
/**
* Return the metadata for the specified batchId if it's stored. Otherwise, return None.
*/
def get(batchId: Long): Option[T]
/**
* Return metadata for batches between startId (inclusive) and endId (inclusive). If `startId` is
* `None`, just return all batches before endId (inclusive).
*/
def get(startId: Option[Long], endId: Long): Array[(Long, T)]
/**
* Return the latest batch Id and its metadata if exist.
*/
def getLatest(): Option[(Long, T)]
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.streaming
import java.util.ConcurrentModificationException
import org.scalatest.concurrent.AsyncAssertions._
import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.test.SharedSQLContext
class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
test("basic") {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath)
assert(metadataLog.add(0, "batch0"))
assert(metadataLog.getLatest() === Some(0 -> "batch0"))
assert(metadataLog.get(0) === Some("batch0"))
assert(metadataLog.getLatest() === Some(0 -> "batch0"))
assert(metadataLog.get(None, 0) === Array(0 -> "batch0"))
assert(metadataLog.add(1, "batch1"))
assert(metadataLog.get(0) === Some("batch0"))
assert(metadataLog.get(1) === Some("batch1"))
assert(metadataLog.getLatest() === Some(1 -> "batch1"))
assert(metadataLog.get(None, 1) === Array(0 -> "batch0", 1 -> "batch1"))
// Adding the same batch does nothing
metadataLog.add(1, "batch1-duplicated")
assert(metadataLog.get(0) === Some("batch0"))
assert(metadataLog.get(1) === Some("batch1"))
assert(metadataLog.getLatest() === Some(1 -> "batch1"))
assert(metadataLog.get(None, 1) === Array(0 -> "batch0", 1 -> "batch1"))
}
}
test("restart") {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath)
assert(metadataLog.add(0, "batch0"))
assert(metadataLog.add(1, "batch1"))
assert(metadataLog.get(0) === Some("batch0"))
assert(metadataLog.get(1) === Some("batch1"))
assert(metadataLog.getLatest() === Some(1 -> "batch1"))
assert(metadataLog.get(None, 1) === Array(0 -> "batch0", 1 -> "batch1"))
val metadataLog2 = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath)
assert(metadataLog2.get(0) === Some("batch0"))
assert(metadataLog2.get(1) === Some("batch1"))
assert(metadataLog2.getLatest() === Some(1 -> "batch1"))
assert(metadataLog2.get(None, 1) === Array(0 -> "batch0", 1 -> "batch1"))
}
}
test("metadata directory collision") {
withTempDir { temp =>
val waiter = new Waiter
val maxBatchId = 100
for (id <- 0 until 10) {
new Thread() {
override def run(): Unit = waiter {
val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath)
try {
var nextBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
nextBatchId += 1
while (nextBatchId <= maxBatchId) {
metadataLog.add(nextBatchId, nextBatchId.toString)
nextBatchId += 1
}
} catch {
case e: ConcurrentModificationException =>
// This is expected since there are multiple writers
} finally {
waiter.dismiss()
}
}
}.start()
}
waiter.await(timeout(10.seconds), dismissals(10))
val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath)
assert(metadataLog.getLatest() === Some(maxBatchId -> maxBatchId.toString))
assert(metadataLog.get(None, maxBatchId) === (0 to maxBatchId).map(i => (i, i.toString)))
}
}
}
......@@ -17,13 +17,11 @@
package org.apache.spark.sql.streaming
import java.io.{ByteArrayInputStream, File, FileNotFoundException, InputStream}
import java.nio.charset.StandardCharsets
import java.io.File
import org.apache.spark.sql.{AnalysisException, StreamTest}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.FileStreamSource._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.Utils
......@@ -359,60 +357,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
Utils.deleteRecursively(tmp)
}
test("fault tolerance with corrupted metadata file") {
val src = Utils.createTempDir("streaming.src")
assert(new File(src, "_metadata").mkdirs())
stringToFile(
new File(src, "_metadata/0"),
s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\n-/e/f/g\nEND\n")
stringToFile(new File(src, "_metadata/1"), s"${FileStreamSource.VERSION}\nSTART\n-")
val textSource = createFileStreamSource("text", src.getCanonicalPath)
// the metadata file of batch is corrupted, so currentOffset should be 0
assert(textSource.currentOffset === LongOffset(0))
Utils.deleteRecursively(src)
}
test("fault tolerance with normal metadata file") {
val src = Utils.createTempDir("streaming.src")
assert(new File(src, "_metadata").mkdirs())
stringToFile(
new File(src, "_metadata/0"),
s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\n-/e/f/g\nEND\n")
stringToFile(
new File(src, "_metadata/1"),
s"${FileStreamSource.VERSION}\nSTART\n-/x/y/z\nEND\n")
val textSource = createFileStreamSource("text", src.getCanonicalPath)
assert(textSource.currentOffset === LongOffset(1))
Utils.deleteRecursively(src)
}
test("readBatch") {
def stringToStream(str: String): InputStream =
new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8))
// Invalid metadata
assert(readBatch(stringToStream("")) === Nil)
assert(readBatch(stringToStream(FileStreamSource.VERSION)) === Nil)
assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\n")) === Nil)
assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART")) === Nil)
assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART\n-")) === Nil)
assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c")) === Nil)
assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\n")) === Nil)
assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\nEN")) === Nil)
// Valid metadata
assert(readBatch(stringToStream(
s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\nEND")) === Seq("/a/b/c"))
assert(readBatch(stringToStream(
s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\nEND\n")) === Seq("/a/b/c"))
assert(readBatch(stringToStream(
s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\n-/e/f/g\nEND\n"))
=== Seq("/a/b/c", "/e/f/g"))
}
}
class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQLContext {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment