Skip to content
Snippets Groups Projects
Commit fae830d1 authored by Shixiong Zhu's avatar Shixiong Zhu
Browse files

[SPARK-13245][CORE] Call shuffleMetrics methods only in one thread for ShuffleBlockFetcherIterator

Call shuffleMetrics's incRemoteBytesRead and incRemoteBlocksFetched when polling FetchResult from `results` so as to always use shuffleMetrics in one thread.

Also fix a race condition that could cause memory leak.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #11138 from zsxwing/SPARK-13245.
parent 7fe4fe63
No related branches found
No related tags found
No related merge requests found
...@@ -19,6 +19,7 @@ package org.apache.spark.storage ...@@ -19,6 +19,7 @@ package org.apache.spark.storage
import java.io.InputStream import java.io.InputStream
import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.LinkedBlockingQueue
import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.{ArrayBuffer, HashSet, Queue} import scala.collection.mutable.{ArrayBuffer, HashSet, Queue}
import scala.util.control.NonFatal import scala.util.control.NonFatal
...@@ -107,7 +108,8 @@ final class ShuffleBlockFetcherIterator( ...@@ -107,7 +108,8 @@ final class ShuffleBlockFetcherIterator(
* Whether the iterator is still active. If isZombie is true, the callback interface will no * Whether the iterator is still active. If isZombie is true, the callback interface will no
* longer place fetched blocks into [[results]]. * longer place fetched blocks into [[results]].
*/ */
@volatile private[this] var isZombie = false @GuardedBy("this")
private[this] var isZombie = false
initialize() initialize()
...@@ -126,14 +128,22 @@ final class ShuffleBlockFetcherIterator( ...@@ -126,14 +128,22 @@ final class ShuffleBlockFetcherIterator(
* Mark the iterator as zombie, and release all buffers that haven't been deserialized yet. * Mark the iterator as zombie, and release all buffers that haven't been deserialized yet.
*/ */
private[this] def cleanup() { private[this] def cleanup() {
isZombie = true synchronized {
isZombie = true
}
releaseCurrentResultBuffer() releaseCurrentResultBuffer()
// Release buffers in the results queue // Release buffers in the results queue
val iter = results.iterator() val iter = results.iterator()
while (iter.hasNext) { while (iter.hasNext) {
val result = iter.next() val result = iter.next()
result match { result match {
case SuccessFetchResult(_, _, _, buf) => buf.release() case SuccessFetchResult(_, address, _, buf) => {
if (address != blockManager.blockManagerId) {
shuffleMetrics.incRemoteBytesRead(buf.size)
shuffleMetrics.incRemoteBlocksFetched(1)
}
buf.release()
}
case _ => case _ =>
} }
} }
...@@ -154,13 +164,13 @@ final class ShuffleBlockFetcherIterator( ...@@ -154,13 +164,13 @@ final class ShuffleBlockFetcherIterator(
override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = { override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = {
// Only add the buffer to results queue if the iterator is not zombie, // Only add the buffer to results queue if the iterator is not zombie,
// i.e. cleanup() has not been called yet. // i.e. cleanup() has not been called yet.
if (!isZombie) { ShuffleBlockFetcherIterator.this.synchronized {
// Increment the ref count because we need to pass this to a different thread. if (!isZombie) {
// This needs to be released after use. // Increment the ref count because we need to pass this to a different thread.
buf.retain() // This needs to be released after use.
results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf)) buf.retain()
shuffleMetrics.incRemoteBytesRead(buf.size) results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf))
shuffleMetrics.incRemoteBlocksFetched(1) }
} }
logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
} }
...@@ -289,7 +299,13 @@ final class ShuffleBlockFetcherIterator( ...@@ -289,7 +299,13 @@ final class ShuffleBlockFetcherIterator(
shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait) shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait)
result match { result match {
case SuccessFetchResult(_, _, size, _) => bytesInFlight -= size case SuccessFetchResult(_, address, size, buf) => {
if (address != blockManager.blockManagerId) {
shuffleMetrics.incRemoteBytesRead(buf.size)
shuffleMetrics.incRemoteBlocksFetched(1)
}
bytesInFlight -= size
}
case _ => case _ =>
} }
// Send fetch requests up to maxBytesInFlight // Send fetch requests up to maxBytesInFlight
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment