Skip to content
Snippets Groups Projects
Commit 8023242e authored by Josh Rosen's avatar Josh Rosen Committed by Reynold Xin
Browse files

[SPARK-10761] Refactor DiskBlockObjectWriter to not require BlockId

The DiskBlockObjectWriter constructor took a BlockId parameter but never used it. As part of some general cleanup in these interfaces, this patch refactors its constructor to eliminate this parameter.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #8871 from JoshRosen/disk-block-object-writer-blockid-cleanup.
parent b3862d3c
No related branches found
No related tags found
No related merge requests found
......@@ -151,7 +151,7 @@ final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter<
} finally {
Closeables.close(in, copyThrewException);
}
if (!blockManager.diskBlockManager().getFile(partitionWriters[i].blockId()).delete()) {
if (!partitionWriters[i].fileSegment().file().delete()) {
logger.error("Unable to delete file for partition {}", i);
}
}
......@@ -168,12 +168,11 @@ final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter<
public void stop() throws IOException {
if (partitionWriters != null) {
try {
final DiskBlockManager diskBlockManager = blockManager.diskBlockManager();
for (DiskBlockObjectWriter writer : partitionWriters) {
// This method explicitly does _not_ throw exceptions:
writer.revertPartialWritesAndClose();
if (!diskBlockManager.getFile(writer.blockId()).delete()) {
logger.error("Error while deleting file for block {}", writer.blockId());
File file = writer.revertPartialWritesAndClose();
if (!file.delete()) {
logger.error("Error while deleting file {}", file.getAbsolutePath());
}
}
} finally {
......
......@@ -119,9 +119,8 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
}
private[spark] object IndexShuffleBlockResolver {
// No-op reduce ID used in interactions with disk store and DiskBlockObjectWriter.
// No-op reduce ID used in interactions with disk store.
// The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort
// shuffle outputs for several reduces are glommed into a single file.
// TODO: Avoid this entirely by having the DiskBlockObjectWriter not require a BlockId.
val NOOP_REDUCE_ID = 0
}
......@@ -669,7 +669,7 @@ private[spark] class BlockManager(
writeMetrics: ShuffleWriteMetrics): DiskBlockObjectWriter = {
val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
val syncWrites = conf.getBoolean("spark.shuffle.sync", false)
new DiskBlockObjectWriter(blockId, file, serializerInstance, bufferSize, compressStream,
new DiskBlockObjectWriter(file, serializerInstance, bufferSize, compressStream,
syncWrites, writeMetrics)
}
......
......@@ -34,7 +34,6 @@ import org.apache.spark.util.Utils
* reopened again.
*/
private[spark] class DiskBlockObjectWriter(
val blockId: BlockId,
file: File,
serializerInstance: SerializerInstance,
bufferSize: Int,
......@@ -144,8 +143,10 @@ private[spark] class DiskBlockObjectWriter(
* Reverts writes that haven't been flushed yet. Callers should invoke this function
* when there are runtime exceptions. This method will not throw, though it may be
* unsuccessful in truncating written data.
*
* @return the file that this DiskBlockObjectWriter wrote to.
*/
def revertPartialWritesAndClose() {
def revertPartialWritesAndClose(): File = {
// Discard current writes. We do this by flushing the outstanding writes and then
// truncating the file to its initial position.
try {
......@@ -160,12 +161,14 @@ private[spark] class DiskBlockObjectWriter(
val truncateStream = new FileOutputStream(file, true)
try {
truncateStream.getChannel.truncate(initialPosition)
file
} finally {
truncateStream.close()
}
} catch {
case e: Exception =>
logError("Uncaught exception while reverting partial writes to file " + file, e)
file
}
}
......
......@@ -129,7 +129,6 @@ public class UnsafeShuffleWriterSuite {
Object[] args = invocationOnMock.getArguments();
return new DiskBlockObjectWriter(
(BlockId) args[0],
(File) args[1],
(SerializerInstance) args[2],
(Integer) args[3],
......
......@@ -127,7 +127,6 @@ public class UnsafeExternalSorterSuite {
Object[] args = invocationOnMock.getArguments();
return new DiskBlockObjectWriter(
(BlockId) args[0],
(File) args[1],
(SerializerInstance) args[2],
(Integer) args[3],
......
......@@ -72,7 +72,6 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
override def answer(invocation: InvocationOnMock): DiskBlockObjectWriter = {
val args = invocation.getArguments
new DiskBlockObjectWriter(
args(0).asInstanceOf[BlockId],
args(1).asInstanceOf[File],
args(2).asInstanceOf[SerializerInstance],
args(3).asInstanceOf[Int],
......
......@@ -20,7 +20,6 @@ import java.io.File
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.serializer.JavaSerializer
......@@ -41,8 +40,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
test("verify write metrics") {
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
val writer = new DiskBlockObjectWriter(
file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
writer.write(Long.box(20), Long.box(30))
// Record metrics update on every write
......@@ -63,8 +62,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
test("verify write metrics on revert") {
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
val writer = new DiskBlockObjectWriter(
file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
writer.write(Long.box(20), Long.box(30))
// Record metrics update on every write
......@@ -86,8 +85,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
test("Reopening a closed block writer") {
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
val writer = new DiskBlockObjectWriter(
file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
writer.open()
writer.close()
......@@ -99,8 +98,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
test("calling revertPartialWritesAndClose() on a closed block writer should have no effect") {
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
val writer = new DiskBlockObjectWriter(
file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
for (i <- 1 to 1000) {
writer.write(i, i)
}
......@@ -115,8 +114,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
test("commitAndClose() should be idempotent") {
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
val writer = new DiskBlockObjectWriter(
file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
for (i <- 1 to 1000) {
writer.write(i, i)
}
......@@ -133,8 +132,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
test("revertPartialWritesAndClose() should be idempotent") {
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
val writer = new DiskBlockObjectWriter(
file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
for (i <- 1 to 1000) {
writer.write(i, i)
}
......@@ -151,8 +150,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
test("fileSegment() can only be called after commitAndClose() has been called") {
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
val writer = new DiskBlockObjectWriter(
file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
for (i <- 1 to 1000) {
writer.write(i, i)
}
......@@ -165,8 +164,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
test("commitAndClose() without ever opening or writing") {
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
val writer = new DiskBlockObjectWriter(
file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
writer.commitAndClose()
assert(writer.fileSegment().length === 0)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment