Skip to content
Snippets Groups Projects
Commit 4f1dcd3d authored by WeichenXu's avatar WeichenXu Committed by Reynold Xin
Browse files

[SPARK-18051][SPARK CORE] fix bug of custom PartitionCoalescer causing serialization exception

## What changes were proposed in this pull request?

add a require check in `CoalescedRDD` to make sure the passed in `partitionCoalescer` to be `serializable`.
and update the document for api `RDD.coalesce`

## How was this patch tested?

Manual.(test code in jira [SPARK-18051])

Author: WeichenXu <WeichenXu123@outlook.com>

Closes #15587 from WeichenXu123/fix_coalescer_bug.
parent 5fa9f879
No related branches found
No related tags found
No related merge requests found
......@@ -80,6 +80,10 @@ private[spark] class CoalescedRDD[T: ClassTag](
require(maxPartitions > 0 || maxPartitions == prev.partitions.length,
s"Number of partitions ($maxPartitions) must be positive.")
if (partitionCoalescer.isDefined) {
require(partitionCoalescer.get.isInstanceOf[Serializable],
"The partition coalescer passed in must be serializable.")
}
override def getPartitions: Array[Partition] = {
val pc = partitionCoalescer.getOrElse(new DefaultPartitionCoalescer())
......
......@@ -432,7 +432,8 @@ abstract class RDD[T: ClassTag](
* of partitions. This is useful if you have a small number of partitions,
* say 100, potentially with a few partitions being abnormally large. Calling
* coalesce(1000, shuffle = true) will result in 1000 partitions with the
* data distributed using a hash partitioner.
* data distributed using a hash partitioner. The optional partition coalescer
* passed in must be serializable.
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment