Skip to content
Snippets Groups Projects
Commit 4518642a authored by Guoqiang Li's avatar Guoqiang Li Committed by Shixiong Zhu
Browse files

[SPARK-17930][CORE] The SerializerInstance instance used when deserializing a...

[SPARK-17930][CORE] The SerializerInstance instance used when deserializing a TaskResult is not reused

## What changes were proposed in this pull request?
The following code is called when the DirectTaskResult instance is deserialized

```scala

  def value(): T = {
    if (valueObjectDeserialized) {
      valueObject
    } else {
      // Each deserialization creates a new instance of SerializerInstance, which is very time-consuming
      val resultSer = SparkEnv.get.serializer.newInstance()
      valueObject = resultSer.deserialize(valueBytes)
      valueObjectDeserialized = true
      valueObject
    }
  }

```

In the case of stage has a lot of tasks, reuse SerializerInstance instance can improve the scheduling performance of three times

The test data is TPC-DS 2T (Parquet) and  SQL statement as follows (query 2):

```sql

select  i_item_id,
        avg(ss_quantity) agg1,
        avg(ss_list_price) agg2,
        avg(ss_coupon_amt) agg3,
        avg(ss_sales_price) agg4
 from store_sales, customer_demographics, date_dim, item, promotion
 where ss_sold_date_sk = d_date_sk and
       ss_item_sk = i_item_sk and
       ss_cdemo_sk = cd_demo_sk and
       ss_promo_sk = p_promo_sk and
       cd_gender = 'M' and
       cd_marital_status = 'M' and
       cd_education_status = '4 yr Degree' and
       (p_channel_email = 'N' or p_channel_event = 'N') and
       d_year = 2001
 group by i_item_id
 order by i_item_id
 limit 100;

```

`spark-defaults.conf` file:

```
spark.master                           yarn-client
spark.executor.instances               20
spark.driver.memory                    16g
spark.executor.memory                  30g
spark.executor.cores                   5
spark.default.parallelism              100
spark.sql.shuffle.partitions           100000
spark.serializer                       org.apache.spark.serializer.KryoSerializer
spark.driver.maxResultSize              0
spark.rpc.netty.dispatcher.numThreads   8
spark.executor.extraJavaOptions          -XX:+UseG1GC -XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=256M
spark.cleaner.referenceTracking.blocking true
spark.cleaner.referenceTracking.blocking.shuffle true

```

Performance test results are as follows

[SPARK-17930](https://github.com/witgo/spark/tree/SPARK-17930)| [ed146334](https://github.com/witgo/spark/commit/ed1463341455830b8867b721a1b34f291139baf3])
------------ | -------------
54.5 s|231.7 s

## How was this patch tested?

Existing tests.

Author: Guoqiang Li <witgo@qq.com>

Closes #15512 from witgo/SPARK-17930.
parent 20dd1109
No related branches found
No related tags found
No related merge requests found
......@@ -23,6 +23,7 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkEnv
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.storage.BlockId
import org.apache.spark.util.{AccumulatorV2, Utils}
......@@ -77,14 +78,14 @@ private[spark] class DirectTaskResult[T](
*
* After the first time, `value()` is trivial and just returns the deserialized `valueObject`.
*/
def value(): T = {
def value(resultSer: SerializerInstance = null): T = {
if (valueObjectDeserialized) {
valueObject
} else {
// This should not run when holding a lock because it may cost dozens of seconds for a large
// value.
val resultSer = SparkEnv.get.serializer.newInstance()
valueObject = resultSer.deserialize(valueBytes)
// value
val ser = if (resultSer == null) SparkEnv.get.serializer.newInstance() else resultSer
valueObject = ser.deserialize(valueBytes)
valueObjectDeserialized = true
valueObject
}
......
......@@ -48,6 +48,12 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
}
}
protected val taskResultSerializer = new ThreadLocal[SerializerInstance] {
override def initialValue(): SerializerInstance = {
sparkEnv.serializer.newInstance()
}
}
def enqueueSuccessfulTask(
taskSetManager: TaskSetManager,
tid: Long,
......@@ -63,7 +69,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
// deserialize "value" without holding any lock so that it won't block other threads.
// We should call it here, so that when it's called again in
// "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.
directResult.value()
directResult.value(taskResultSerializer.get())
(directResult, serializedData.limit())
case IndirectTaskResult(blockId, size) =>
if (!taskSetManager.canFetchMoreResults(size)) {
......@@ -84,6 +90,8 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
}
val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
serializedTaskResult.get.toByteBuffer)
// force deserialization of referenced value
deserializedResult.value(taskResultSerializer.get())
sparkEnv.blockManager.master.removeBlock(blockId)
(deserializedResult, size)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment