From 8346518357f4a3565ae41e9a5ccd7e2c3ed6c468 Mon Sep 17 00:00:00 2001
From: Darek Blasiak <darek.blasiak@640labs.com>
Date: Thu, 7 Jan 2016 21:15:40 +0000
Subject: [PATCH] [SPARK-12598][CORE] bug in setMinPartitions

There is a bug in the calculation of ```maxSplitSize```.  The ```totalLen``` should be divided by ```minPartitions``` and not by ```files.size```.

Author: Darek Blasiak <darek.blasiak@640labs.com>

Closes #10546 from datafarmer/setminpartitionsbug.
---
 .../scala/org/apache/spark/input/PortableDataStream.scala    | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
index 8009491a1b..18cb7631b3 100644
--- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -41,9 +41,8 @@ private[spark] abstract class StreamFileInputFormat[T]
    * which is set through setMaxSplitSize
    */
   def setMinPartitions(context: JobContext, minPartitions: Int) {
-    val files = listStatus(context).asScala
-    val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum
-    val maxSplitSize = Math.ceil(totalLen * 1.0 / files.size).toLong
+    val totalLen = listStatus(context).asScala.filterNot(_.isDirectory).map(_.getLen).sum
+    val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong
     super.setMaxSplitSize(maxSplitSize)
   }
 
-- 
GitLab