Skip to content
Snippets Groups Projects
Commit 6ea5055f authored by Ryan Blue's avatar Ryan Blue Committed by Sean Owen
Browse files

[SPARK-17396][CORE] Share the task support between UnionRDD instances.

## What changes were proposed in this pull request?

Share the ForkJoinTaskSupport between UnionRDD instances to avoid creating a huge number of threads if lots of RDDs are created at the same time.

## How was this patch tested?

This uses existing UnionRDD tests.

Author: Ryan Blue <blue@apache.org>

Closes #14985 from rdblue/SPARK-17396-use-shared-pool.
parent bcdd259c
No related branches found
No related tags found
No related merge requests found
...@@ -20,7 +20,7 @@ package org.apache.spark.rdd ...@@ -20,7 +20,7 @@ package org.apache.spark.rdd
import java.io.{IOException, ObjectOutputStream} import java.io.{IOException, ObjectOutputStream}
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.ForkJoinTaskSupport import scala.collection.parallel.{ForkJoinTaskSupport, ThreadPoolTaskSupport}
import scala.concurrent.forkjoin.ForkJoinPool import scala.concurrent.forkjoin.ForkJoinPool
import scala.reflect.ClassTag import scala.reflect.ClassTag
...@@ -58,6 +58,11 @@ private[spark] class UnionPartition[T: ClassTag]( ...@@ -58,6 +58,11 @@ private[spark] class UnionPartition[T: ClassTag](
} }
} }
object UnionRDD {
private[spark] lazy val partitionEvalTaskSupport =
new ForkJoinTaskSupport(new ForkJoinPool(8))
}
@DeveloperApi @DeveloperApi
class UnionRDD[T: ClassTag]( class UnionRDD[T: ClassTag](
sc: SparkContext, sc: SparkContext,
...@@ -68,13 +73,10 @@ class UnionRDD[T: ClassTag]( ...@@ -68,13 +73,10 @@ class UnionRDD[T: ClassTag](
private[spark] val isPartitionListingParallel: Boolean = private[spark] val isPartitionListingParallel: Boolean =
rdds.length > conf.getInt("spark.rdd.parallelListingThreshold", 10) rdds.length > conf.getInt("spark.rdd.parallelListingThreshold", 10)
@transient private lazy val partitionEvalTaskSupport =
new ForkJoinTaskSupport(new ForkJoinPool(8))
override def getPartitions: Array[Partition] = { override def getPartitions: Array[Partition] = {
val parRDDs = if (isPartitionListingParallel) { val parRDDs = if (isPartitionListingParallel) {
val parArray = rdds.par val parArray = rdds.par
parArray.tasksupport = partitionEvalTaskSupport parArray.tasksupport = UnionRDD.partitionEvalTaskSupport
parArray parArray
} else { } else {
rdds rdds
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment