Skip to content
Snippets Groups Projects
Commit 08db4912 authored by Ryan Blue's avatar Ryan Blue Committed by Andrew Or
Browse files

[SPARK-9926] Parallelize partition logic in UnionRDD.

This patch has the new logic from #8512 that uses a parallel collection to compute partitions in UnionRDD. The rest of #8512 added an alternative code path for calculating splits in S3, but that isn't necessary to get the same speedup. The underlying problem wasn't that bulk listing wasn't used, it was that an extra FileStatus was retrieved for each file. The fix was just committed as [HADOOP-12810](https://issues.apache.org/jira/browse/HADOOP-12810). (I think the original commit also used a single prefix to enumerate all paths, but that isn't always helpful and it was removed in later versions so there is no need for SparkS3Utils.)

I tested this using the same table that piapiaozhexiu was using. Calculating splits for a 10-day period took 25 seconds with this change and HADOOP-12810, which is on par with the results from #8512.

Author: Ryan Blue <blue@apache.org>
Author: Cheolsoo Park <cheolsoop@netflix.com>

Closes #11242 from rdblue/SPARK-9926-parallelize-union-rdd.
parent 5c47db06
No related branches found
No related tags found
No related merge requests found
......@@ -20,6 +20,8 @@ package org.apache.spark.rdd
import java.io.{IOException, ObjectOutputStream}
import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool
import scala.reflect.ClassTag
import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext}
......@@ -62,8 +64,22 @@ class UnionRDD[T: ClassTag](
var rdds: Seq[RDD[T]])
extends RDD[T](sc, Nil) { // Nil since we implement getDependencies
// visible for testing
private[spark] val isPartitionListingParallel: Boolean =
rdds.length > conf.getInt("spark.rdd.parallelListingThreshold", 10)
@transient private lazy val partitionEvalTaskSupport =
new ForkJoinTaskSupport(new ForkJoinPool(8))
override def getPartitions: Array[Partition] = {
val array = new Array[Partition](rdds.map(_.partitions.length).sum)
val parRDDs = if (isPartitionListingParallel) {
val parArray = rdds.par
parArray.tasksupport = partitionEvalTaskSupport
parArray
} else {
rdds
}
val array = new Array[Partition](parRDDs.map(_.partitions.length).seq.sum)
var pos = 0
for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) {
array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index)
......
......@@ -116,6 +116,23 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
}
test("SparkContext.union parallel partition listing") {
val nums1 = sc.makeRDD(Array(1, 2, 3, 4), 2)
val nums2 = sc.makeRDD(Array(5, 6, 7, 8), 2)
val serialUnion = sc.union(nums1, nums2)
val expected = serialUnion.collect().toList
assert(serialUnion.asInstanceOf[UnionRDD[Int]].isPartitionListingParallel === false)
sc.conf.set("spark.rdd.parallelListingThreshold", "1")
val parallelUnion = sc.union(nums1, nums2)
val actual = parallelUnion.collect().toList
sc.conf.remove("spark.rdd.parallelListingThreshold")
assert(parallelUnion.asInstanceOf[UnionRDD[Int]].isPartitionListingParallel === true)
assert(expected === actual)
}
test("SparkContext.union creates UnionRDD if at least one RDD has no partitioner") {
val rddWithPartitioner = sc.parallelize(Seq(1 -> true)).partitionBy(new HashPartitioner(1))
val rddWithNoPartitioner = sc.parallelize(Seq(2 -> true))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment