Skip to content
Snippets Groups Projects
Commit 5b5a69be authored by Thomas Graves's avatar Thomas Graves Committed by Wenchen Fan
Browse files

[SPARK-20923] turn tracking of TaskMetrics._updatedBlockStatuses off

## What changes were proposed in this pull request?
Turn tracking of TaskMetrics._updatedBlockStatuses off by default. As far as I can see its not used by anything and it uses a lot of memory when caching and processing a lot of blocks.  In my case it was taking 5GB of a 10GB heap and I even went up to 50GB heap and the job still ran out of memory.  With this change in place the same job easily runs in less then 10GB of heap.

We leave the api there as well as a config to turn it back on just in case anyone is using it.  TaskMetrics is exposed via SparkListenerTaskEnd so if users are relying on it they can turn it back on.

## How was this patch tested?

Ran unit tests that were modified and manually tested on a couple of jobs (with and without caching).  Clicked through the UI and didn't see anything missing.
Ran my very large hive query job with 200,000 small tasks, 1000 executors, cached 6+TB of data this runs fine now whereas without this change it would go into full gcs and eventually die.

Author: Thomas Graves <tgraves@thirteenroutine.corp.gq1.yahoo.com>
Author: Tom Graves <tgraves@yahoo-inc.com>

Closes #18162 from tgravescs/SPARK-20923.
parent e4469760
No related branches found
No related tags found
No related merge requests found
......@@ -112,6 +112,12 @@ class TaskMetrics private[spark] () extends Serializable {
/**
* Storage statuses of any blocks that have been updated as a result of this task.
*
* Tracking the _updatedBlockStatuses can use a lot of memory.
* It is not used anywhere inside of Spark so we would ideally remove it, but its exposed to
* the user in SparkListenerTaskEnd so the api is kept for compatibility.
* Tracking can be turned off to save memory via config
* TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES.
*/
def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = {
// This is called on driver. All accumulator updates have a fixed value. So it's safe to use
......
......@@ -322,4 +322,12 @@ package object config {
"above this threshold. This is to avoid a giant request takes too much memory.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("200m")
private[spark] val TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES =
ConfigBuilder("spark.taskMetrics.trackUpdatedBlockStatuses")
.doc("Enable tracking of updatedBlockStatuses in the TaskMetrics. Off by default since " +
"tracking the block statuses can use a lot of memory and its not used anywhere within " +
"spark.")
.booleanConf
.createWithDefault(false)
}
......@@ -1473,8 +1473,10 @@ private[spark] class BlockManager(
}
private def addUpdatedBlockStatusToTaskMetrics(blockId: BlockId, status: BlockStatus): Unit = {
Option(TaskContext.get()).foreach { c =>
c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
if (conf.get(config.TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES)) {
Option(TaskContext.get()).foreach { c =>
c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
}
}
}
......
......@@ -922,8 +922,38 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
}
test("turn off updated block statuses") {
val conf = new SparkConf()
conf.set(TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES, false)
store = makeBlockManager(12000, testConf = Some(conf))
store.registerTask(0)
val list = List.fill(2)(new Array[Byte](2000))
def getUpdatedBlocks(task: => Unit): Seq[(BlockId, BlockStatus)] = {
val context = TaskContext.empty()
try {
TaskContext.setTaskContext(context)
task
} finally {
TaskContext.unset()
}
context.taskMetrics.updatedBlockStatuses
}
// 1 updated block (i.e. list1)
val updatedBlocks1 = getUpdatedBlocks {
store.putIterator(
"list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
assert(updatedBlocks1.size === 0)
}
test("updated block statuses") {
store = makeBlockManager(12000)
val conf = new SparkConf()
conf.set(TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES, true)
store = makeBlockManager(12000, testConf = Some(conf))
store.registerTask(0)
val list = List.fill(2)(new Array[Byte](2000))
val bigList = List.fill(8)(new Array[Byte](2000))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment