Skip to content
Snippets Groups Projects
Commit 6f369713 authored by fidato's avatar fidato Committed by Reynold Xin
Browse files

[SPARK-16575][CORE] partition calculation mismatch with sc.binaryFiles

## What changes were proposed in this pull request?

This Pull request comprises of the critical bug SPARK-16575 changes. This change rectifies the issue with BinaryFileRDD partition calculations as  upon creating an RDD with sc.binaryFiles, the resulting RDD always just consisted of two partitions only.
## How was this patch tested?

The original issue ie. getNumPartitions on binary Files RDD (always having two partitions) was first replicated and then tested upon the changes. Also the unit tests have been checked and passed.

This contribution is my original work and I licence the work to the project under the project's open source license

srowen hvanhovell rxin vanzin skyluc kmader zsxwing datafarmer Please have a look .

Author: fidato <fidato.july13@gmail.com>

Closes #15327 from fidato13/SPARK-16575.
parent 1da64e1f
No related branches found
No related tags found
No related merge requests found
......@@ -27,6 +27,9 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat, CombineFileRecordReader, CombineFileSplit}
import org.apache.spark.internal.config
import org.apache.spark.SparkContext
/**
* A general format for reading whole files in as streams, byte arrays,
* or other functions to be added
......@@ -40,9 +43,14 @@ private[spark] abstract class StreamFileInputFormat[T]
* Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API
* which is set through setMaxSplitSize
*/
def setMinPartitions(context: JobContext, minPartitions: Int) {
val totalLen = listStatus(context).asScala.filterNot(_.isDirectory).map(_.getLen).sum
val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong
def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) {
val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
val defaultParallelism = sc.defaultParallelism
val files = listStatus(context).asScala
val totalBytes = files.filterNot(_.isDirectory).map(_.getLen + openCostInBytes).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
super.setMaxSplitSize(maxSplitSize)
}
......
......@@ -206,4 +206,17 @@ package object config {
"encountering corrupt files and contents that have been read will still be returned.")
.booleanConf
.createWithDefault(false)
private[spark] val FILES_MAX_PARTITION_BYTES = ConfigBuilder("spark.files.maxPartitionBytes")
.doc("The maximum number of bytes to pack into a single partition when reading files.")
.longConf
.createWithDefault(128 * 1024 * 1024)
private[spark] val FILES_OPEN_COST_IN_BYTES = ConfigBuilder("spark.files.openCostInBytes")
.doc("The estimated cost to open a file, measured by the number of bytes could be scanned in" +
" the same time. This is used when putting multiple files into a partition. It's better to" +
" over estimate, then the partitions with small files will be faster than partitions with" +
" bigger files.")
.longConf
.createWithDefault(4 * 1024 * 1024)
}
......@@ -26,7 +26,7 @@ import org.apache.spark.{Partition, SparkContext}
import org.apache.spark.input.StreamFileInputFormat
private[spark] class BinaryFileRDD[T](
sc: SparkContext,
@transient private val sc: SparkContext,
inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
keyClass: Class[String],
valueClass: Class[T],
......@@ -43,7 +43,7 @@ private[spark] class BinaryFileRDD[T](
case _ =>
}
val jobContext = new JobContextImpl(conf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions)
inputFormat.setMinPartitions(sc, jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
......
......@@ -1034,6 +1034,22 @@ Apart from these, the following properties are also available, and may be useful
its contents do not match those of the source.
</td>
</tr>
<tr>
<td><code>spark.files.maxPartitionBytes</code></td>
<td>134217728 (128 MB)</td>
<td>
The maximum number of bytes to pack into a single partition when reading files.
</td>
</tr>
<tr>
<td><code>spark.files.openCostInBytes</code></td>
<td>4194304 (4 MB)</td>
<td>
The estimated cost to open a file, measured by the number of bytes could be scanned in the same
time. This is used when putting multiple files into a partition. It is better to over estimate,
then the partitions with small files will be faster than partitions with bigger files.
</td>
</tr>
<tr>
<td><code>spark.hadoop.cloneConf</code></td>
<td>false</td>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment