From 6ea5055fa734d435b5f148cf52d3385a57926b60 Mon Sep 17 00:00:00 2001
From: Ryan Blue <blue@apache.org>
Date: Sat, 10 Sep 2016 10:18:53 +0100
Subject: [PATCH] [SPARK-17396][CORE] Share the task support between UnionRDD
 instances.

## What changes were proposed in this pull request?

Share the ForkJoinTaskSupport between UnionRDD instances to avoid creating a huge number of threads if lots of RDDs are created at the same time.

## How was this patch tested?

This uses existing UnionRDD tests.

Author: Ryan Blue <blue@apache.org>

Closes #14985 from rdblue/SPARK-17396-use-shared-pool.
---
 .../main/scala/org/apache/spark/rdd/UnionRDD.scala   | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index 8171dcc046..ad1fddbde7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -20,7 +20,7 @@ package org.apache.spark.rdd
 import java.io.{IOException, ObjectOutputStream}
 
 import scala.collection.mutable.ArrayBuffer
-import scala.collection.parallel.ForkJoinTaskSupport
+import scala.collection.parallel.{ForkJoinTaskSupport, ThreadPoolTaskSupport}
 import scala.concurrent.forkjoin.ForkJoinPool
 import scala.reflect.ClassTag
 
@@ -58,6 +58,11 @@ private[spark] class UnionPartition[T: ClassTag](
   }
 }
 
+object UnionRDD {
+  private[spark] lazy val partitionEvalTaskSupport =
+    new ForkJoinTaskSupport(new ForkJoinPool(8))
+}
+
 @DeveloperApi
 class UnionRDD[T: ClassTag](
     sc: SparkContext,
@@ -68,13 +73,10 @@ class UnionRDD[T: ClassTag](
   private[spark] val isPartitionListingParallel: Boolean =
     rdds.length > conf.getInt("spark.rdd.parallelListingThreshold", 10)
 
-  @transient private lazy val partitionEvalTaskSupport =
-      new ForkJoinTaskSupport(new ForkJoinPool(8))
-
   override def getPartitions: Array[Partition] = {
     val parRDDs = if (isPartitionListingParallel) {
       val parArray = rdds.par
-      parArray.tasksupport = partitionEvalTaskSupport
+      parArray.tasksupport = UnionRDD.partitionEvalTaskSupport
       parArray
     } else {
       rdds
-- 
GitLab