diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala index 8009491a1b0e019a31c5075ca96b4729afc81917..18cb7631b3d4c8abb7dc7c163877c7f3e1ffe303 100644 --- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala +++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala @@ -41,9 +41,8 @@ private[spark] abstract class StreamFileInputFormat[T] * which is set through setMaxSplitSize */ def setMinPartitions(context: JobContext, minPartitions: Int) { - val files = listStatus(context).asScala - val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum - val maxSplitSize = Math.ceil(totalLen * 1.0 / files.size).toLong + val totalLen = listStatus(context).asScala.filterNot(_.isDirectory).map(_.getLen).sum + val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong super.setMaxSplitSize(maxSplitSize) }