Skip to content
Snippets Groups Projects
Commit 4cb49412 authored by Shivaram Venkataraman's avatar Shivaram Venkataraman Committed by Kay Ousterhout
Browse files

[SPARK-18836][CORE] Serialize one copy of task metrics in DAGScheduler

## What changes were proposed in this pull request?

Right now we serialize the empty task metrics once per task – Since this is shared across all tasks we could use the same serialized task metrics across all tasks of a stage.

## How was this patch tested?

- [x] Run tests on EC2 to measure performance improvement

Author: Shivaram Venkataraman <shivaram@cs.berkeley.edu>

Closes #16261 from shivaram/task-metrics-one-copy.
parent 70d495dc
No related branches found
No related tags found
No related merge requests found
......@@ -1009,13 +1009,14 @@ class DAGScheduler(
}
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}
......@@ -1025,7 +1026,7 @@ class DAGScheduler(
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
......
......@@ -42,7 +42,8 @@ import org.apache.spark.rdd.RDD
* @param outputId index of the task in this job (a job can launch tasks on only a subset of the
* input RDD's partitions).
* @param localProperties copy of thread-local properties set by the user on the driver side.
* @param metrics a `TaskMetrics` that is created at driver side and sent to executor side.
* @param serializedTaskMetrics a `TaskMetrics` that is created and serialized on the driver side
* and sent to executor side.
*
* The parameters below are optional:
* @param jobId id of the job this task belongs to
......@@ -57,12 +58,12 @@ private[spark] class ResultTask[T, U](
locs: Seq[TaskLocation],
val outputId: Int,
localProperties: Properties,
metrics: TaskMetrics,
serializedTaskMetrics: Array[Byte],
jobId: Option[Int] = None,
appId: Option[String] = None,
appAttemptId: Option[String] = None)
extends Task[U](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId,
appId, appAttemptId)
extends Task[U](stageId, stageAttemptId, partition.index, localProperties, serializedTaskMetrics,
jobId, appId, appAttemptId)
with Serializable {
@transient private[this] val preferredLocs: Seq[TaskLocation] = {
......
......@@ -42,8 +42,9 @@ import org.apache.spark.shuffle.ShuffleWriter
* the type should be (RDD[_], ShuffleDependency[_, _, _]).
* @param partition partition of the RDD this task is associated with
* @param locs preferred task execution locations for locality scheduling
* @param metrics a `TaskMetrics` that is created at driver side and sent to executor side.
* @param localProperties copy of thread-local properties set by the user on the driver side.
* @param serializedTaskMetrics a `TaskMetrics` that is created and serialized on the driver side
* and sent to executor side.
*
* The parameters below are optional:
* @param jobId id of the job this task belongs to
......@@ -56,18 +57,18 @@ private[spark] class ShuffleMapTask(
taskBinary: Broadcast[Array[Byte]],
partition: Partition,
@transient private var locs: Seq[TaskLocation],
metrics: TaskMetrics,
localProperties: Properties,
serializedTaskMetrics: Array[Byte],
jobId: Option[Int] = None,
appId: Option[String] = None,
appAttemptId: Option[String] = None)
extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId,
appId, appAttemptId)
extends Task[MapStatus](stageId, stageAttemptId, partition.index, localProperties,
serializedTaskMetrics, jobId, appId, appAttemptId)
with Logging {
/** A constructor used only in test suites. This does not require passing in an RDD. */
def this(partitionId: Int) {
this(0, 0, null, new Partition { override def index: Int = 0 }, null, null, new Properties)
this(0, 0, null, new Partition { override def index: Int = 0 }, null, new Properties, null)
}
@transient private val preferredLocs: Seq[TaskLocation] = {
......
......@@ -48,6 +48,8 @@ import org.apache.spark.util._
* @param partitionId index of the number in the RDD
* @param metrics a `TaskMetrics` that is created at driver side and sent to executor side.
* @param localProperties copy of thread-local properties set by the user on the driver side.
* @param serializedTaskMetrics a `TaskMetrics` that is created and serialized on the driver side
* and sent to executor side.
*
* The parameters below are optional:
* @param jobId id of the job this task belongs to
......@@ -58,13 +60,17 @@ private[spark] abstract class Task[T](
val stageId: Int,
val stageAttemptId: Int,
val partitionId: Int,
// The default value is only used in tests.
val metrics: TaskMetrics = TaskMetrics.registered,
@transient var localProperties: Properties = new Properties,
// The default value is only used in tests.
serializedTaskMetrics: Array[Byte] =
SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array(),
val jobId: Option[Int] = None,
val appId: Option[String] = None,
val appAttemptId: Option[String] = None) extends Serializable {
@transient lazy val metrics: TaskMetrics =
SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(serializedTaskMetrics))
/**
* Called by [[org.apache.spark.executor.Executor]] to run this task.
*
......
......@@ -51,9 +51,11 @@ class ExecutorSuite extends SparkFunSuite {
when(mockEnv.metricsSystem).thenReturn(mockMetricsSystem)
when(mockEnv.memoryManager).thenReturn(mockMemoryManager)
when(mockEnv.closureSerializer).thenReturn(serializer)
val fakeTaskMetrics = serializer.newInstance().serialize(TaskMetrics.registered).array()
val serializedTask =
Task.serializeWithDependencies(
new FakeTask(0, 0),
new FakeTask(0, 0, Nil, fakeTaskMetrics),
HashMap[String, Long](),
HashMap[String, Long](),
serializer.newInstance())
......
......@@ -17,12 +17,20 @@
package org.apache.spark.scheduler
import java.util.Properties
import org.apache.spark.SparkEnv
import org.apache.spark.TaskContext
import org.apache.spark.executor.TaskMetrics
class FakeTask(
stageId: Int,
partitionId: Int,
prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0, partitionId) {
prefLocs: Seq[TaskLocation] = Nil,
serializedTaskMetrics: Array[Byte] =
SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array())
extends Task[Int](stageId, 0, partitionId, new Properties, serializedTaskMetrics) {
override def runTask(context: TaskContext): Int = 0
override def preferredLocations: Seq[TaskLocation] = prefLocs
}
......
......@@ -62,7 +62,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
val func = (c: TaskContext, i: Iterator[String]) => i.next()
val taskBinary = sc.broadcast(JavaUtils.bufferToArray(closureSerializer.serialize((rdd, func))))
val task = new ResultTask[String, String](
0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties, new TaskMetrics)
0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties,
closureSerializer.serialize(TaskMetrics.registered).array())
intercept[RuntimeException] {
task.run(0, 0, null)
}
......@@ -83,7 +84,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
val func = (c: TaskContext, i: Iterator[String]) => i.next()
val taskBinary = sc.broadcast(JavaUtils.bufferToArray(closureSerializer.serialize((rdd, func))))
val task = new ResultTask[String, String](
0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties, new TaskMetrics)
0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties,
closureSerializer.serialize(TaskMetrics.registered).array())
intercept[RuntimeException] {
task.run(0, 0, null)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment