Skip to content
Snippets Groups Projects
Commit 972171b9 authored by Reynold Xin's avatar Reynold Xin
Browse files

Merge pull request #197 from aarondav/patrick-fix

Fix 'timeWriting' stat for shuffle files

Due to concurrent git branches, changes from shuffle file consolidation patch
caused the shuffle write timing patch to no longer actually measure the time,
since it requires time be measured after the stream has been closed.
parents 718cc803 ccea38b7
No related branches found
No related tags found
No related merge requests found
......@@ -167,6 +167,7 @@ private[spark] class ShuffleMapTask(
var totalTime = 0L
val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commit()
writer.close()
val size = writer.fileSegment().length
totalBytes += size
totalTime += writer.timeWriting()
......@@ -184,14 +185,16 @@ private[spark] class ShuffleMapTask(
} catch { case e: Exception =>
// If there is an exception from running the task, revert the partial writes
// and throw the exception upstream to Spark.
if (shuffle != null) {
shuffle.writers.foreach(_.revertPartialWrites())
if (shuffle != null && shuffle.writers != null) {
for (writer <- shuffle.writers) {
writer.revertPartialWrites()
writer.close()
}
}
throw e
} finally {
// Release the writers back to the shuffle block manager.
if (shuffle != null && shuffle.writers != null) {
shuffle.writers.foreach(_.close())
shuffle.releaseWriters(success)
}
// Execute the callbacks on task completion.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment