diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index ee48a7b81defba0f9fc979d50605159cc9c3cedd..c1a97de72f65dcd521c07cd1b6daefb980c803d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -134,8 +134,13 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { } case _ => - val maxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes + val defaultMaxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes val openCostInBytes = files.sqlContext.conf.filesOpenCostInBytes + val defaultParallelism = files.sqlContext.sparkContext.defaultParallelism + val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum + val bytesPerCore = totalBytes / defaultParallelism + val maxSplitBytes = Math.min(defaultMaxSplitBytes, + Math.max(openCostInBytes, bytesPerCore)) logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 4699c48c72168c3d45e6fac7b78c15524cfeee56..50cd03a40c6ac3803f99717ea6000b2d0bf0e62c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger import org.apache.hadoop.fs.{BlockLocation, FileStatus, RawLocalFileSystem} import org.apache.hadoop.mapreduce.Job +import org.apache.spark.SparkConf import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper} @@ -38,6 +39,8 @@ import org.apache.spark.util.Utils class FileSourceStrategySuite extends QueryTest with SharedSQLContext with PredicateHelper { import testImplicits._ + protected override val sparkConf = new SparkConf().set("spark.default.parallelism", "1") + test("unpartitioned table, single partition") { val table = createTable(