Skip to content
Snippets Groups Projects
Commit edeb51a3 authored by Burak Yavuz's avatar Burak Yavuz Committed by Shixiong Zhu
Browse files

[SPARK-17876] Write StructuredStreaming WAL to a stream instead of materializing all at once

## What changes were proposed in this pull request?

The CompactibleFileStreamLog materializes the whole metadata log in memory as a String. This can cause issues when there are lots of files that are being committed, especially during a compaction batch.
You may come across stacktraces that look like:
```
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.lang.StringCoding.encode(StringCoding.java:350)
at java.lang.String.getBytes(String.java:941)
at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.serialize(FileStreamSinkLog.scala:127)

```
The safer way is to write to an output stream so that we don't have to materialize a huge string.

## How was this patch tested?

Existing unit tests

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #15437 from brkyvz/ser-to-stream.
parent 21cb59f1
No related branches found
No related tags found
No related merge requests found
...@@ -17,9 +17,10 @@ ...@@ -17,9 +17,10 @@
package org.apache.spark.sql.execution.streaming package org.apache.spark.sql.execution.streaming
import java.io.IOException import java.io.{InputStream, IOException, OutputStream}
import java.nio.charset.StandardCharsets.UTF_8 import java.nio.charset.StandardCharsets.UTF_8
import scala.io.{Source => IOSource}
import scala.reflect.ClassTag import scala.reflect.ClassTag
import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.fs.{Path, PathFilter}
...@@ -93,20 +94,25 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( ...@@ -93,20 +94,25 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
} }
} }
override def serialize(logData: Array[T]): Array[Byte] = { override def serialize(logData: Array[T], out: OutputStream): Unit = {
(metadataLogVersion +: logData.map(serializeData)).mkString("\n").getBytes(UTF_8) // called inside a try-finally where the underlying stream is closed in the caller
out.write(metadataLogVersion.getBytes(UTF_8))
logData.foreach { data =>
out.write('\n')
out.write(serializeData(data).getBytes(UTF_8))
}
} }
override def deserialize(bytes: Array[Byte]): Array[T] = { override def deserialize(in: InputStream): Array[T] = {
val lines = new String(bytes, UTF_8).split("\n") val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
if (lines.length == 0) { if (!lines.hasNext) {
throw new IllegalStateException("Incomplete log file") throw new IllegalStateException("Incomplete log file")
} }
val version = lines(0) val version = lines.next()
if (version != metadataLogVersion) { if (version != metadataLogVersion) {
throw new IllegalStateException(s"Unknown log version: ${version}") throw new IllegalStateException(s"Unknown log version: ${version}")
} }
lines.slice(1, lines.length).map(deserializeData) lines.map(deserializeData).toArray
} }
override def add(batchId: Long, logs: Array[T]): Boolean = { override def add(batchId: Long, logs: Array[T]): Boolean = {
......
...@@ -17,8 +17,7 @@ ...@@ -17,8 +17,7 @@
package org.apache.spark.sql.execution.streaming package org.apache.spark.sql.execution.streaming
import java.io.{FileNotFoundException, IOException} import java.io.{FileNotFoundException, InputStream, IOException, OutputStream}
import java.nio.ByteBuffer
import java.util.{ConcurrentModificationException, EnumSet, UUID} import java.util.{ConcurrentModificationException, EnumSet, UUID}
import scala.reflect.ClassTag import scala.reflect.ClassTag
...@@ -29,7 +28,6 @@ import org.apache.hadoop.fs._ ...@@ -29,7 +28,6 @@ import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission import org.apache.hadoop.fs.permission.FsPermission
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.JavaSerializer import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
import org.apache.spark.util.UninterruptibleThread import org.apache.spark.util.UninterruptibleThread
...@@ -88,12 +86,16 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) ...@@ -88,12 +86,16 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
} }
} }
protected def serialize(metadata: T): Array[Byte] = { protected def serialize(metadata: T, out: OutputStream): Unit = {
JavaUtils.bufferToArray(serializer.serialize(metadata)) // called inside a try-finally where the underlying stream is closed in the caller
val outStream = serializer.serializeStream(out)
outStream.writeObject(metadata)
} }
protected def deserialize(bytes: Array[Byte]): T = { protected def deserialize(in: InputStream): T = {
serializer.deserialize[T](ByteBuffer.wrap(bytes)) // called inside a try-finally where the underlying stream is closed in the caller
val inStream = serializer.deserializeStream(in)
inStream.readObject[T]()
} }
/** /**
...@@ -114,7 +116,7 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) ...@@ -114,7 +116,7 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
// Only write metadata when the batch has not yet been written // Only write metadata when the batch has not yet been written
Thread.currentThread match { Thread.currentThread match {
case ut: UninterruptibleThread => case ut: UninterruptibleThread =>
ut.runUninterruptibly { writeBatch(batchId, serialize(metadata)) } ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) }
case _ => case _ =>
throw new IllegalStateException( throw new IllegalStateException(
"HDFSMetadataLog.add() must be executed on a o.a.spark.util.UninterruptibleThread") "HDFSMetadataLog.add() must be executed on a o.a.spark.util.UninterruptibleThread")
...@@ -129,7 +131,7 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) ...@@ -129,7 +131,7 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
* There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a * There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
* valid behavior, we still need to prevent it from destroying the files. * valid behavior, we still need to prevent it from destroying the files.
*/ */
private def writeBatch(batchId: Long, bytes: Array[Byte]): Unit = { private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = {
// Use nextId to create a temp file // Use nextId to create a temp file
var nextId = 0 var nextId = 0
while (true) { while (true) {
...@@ -137,9 +139,9 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) ...@@ -137,9 +139,9 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
try { try {
val output = fileManager.create(tempPath) val output = fileManager.create(tempPath)
try { try {
output.write(bytes) writer(metadata, output)
} finally { } finally {
output.close() IOUtils.closeQuietly(output)
} }
try { try {
// Try to commit the batch // Try to commit the batch
...@@ -193,10 +195,9 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) ...@@ -193,10 +195,9 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
if (fileManager.exists(batchMetadataFile)) { if (fileManager.exists(batchMetadataFile)) {
val input = fileManager.open(batchMetadataFile) val input = fileManager.open(batchMetadataFile)
try { try {
val bytes = IOUtils.toByteArray(input) Some(deserialize(input))
Some(deserialize(bytes))
} finally { } finally {
input.close() IOUtils.closeQuietly(input)
} }
} else { } else {
logDebug(s"Unable to find batch $batchMetadataFile") logDebug(s"Unable to find batch $batchMetadataFile")
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.streaming package org.apache.spark.sql.execution.streaming
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.nio.charset.StandardCharsets.UTF_8 import java.nio.charset.StandardCharsets.UTF_8
import org.apache.spark.SparkFunSuite import org.apache.spark.SparkFunSuite
...@@ -133,9 +134,12 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { ...@@ -133,9 +134,12 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
|{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"} |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
|{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
// scalastyle:on // scalastyle:on
assert(expected === new String(sinkLog.serialize(logs), UTF_8)) val baos = new ByteArrayOutputStream()
sinkLog.serialize(logs, baos)
assert(VERSION === new String(sinkLog.serialize(Array()), UTF_8)) assert(expected === baos.toString(UTF_8.name()))
baos.reset()
sinkLog.serialize(Array(), baos)
assert(VERSION === baos.toString(UTF_8.name()))
} }
} }
...@@ -174,9 +178,9 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { ...@@ -174,9 +178,9 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
blockSize = 30000L, blockSize = 30000L,
action = FileStreamSinkLog.ADD_ACTION)) action = FileStreamSinkLog.ADD_ACTION))
assert(expected === sinkLog.deserialize(logs.getBytes(UTF_8))) assert(expected === sinkLog.deserialize(new ByteArrayInputStream(logs.getBytes(UTF_8))))
assert(Nil === sinkLog.deserialize(VERSION.getBytes(UTF_8))) assert(Nil === sinkLog.deserialize(new ByteArrayInputStream(VERSION.getBytes(UTF_8))))
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment