From c25b97fccee557c9247ad5bf006a83a55c5e0e32 Mon Sep 17 00:00:00 2001
From: Davies Liu <davies@databricks.com>
Date: Fri, 22 Apr 2016 17:09:16 -0700
Subject: [PATCH] [SPARK-14582][SQL] increase parallelism for small tables

## What changes were proposed in this pull request?

This PR try to increase the parallelism for small table (a few of big files) to reduce the query time, by decrease the maxSplitBytes, the goal is to have at least one task per CPU in the cluster, if the total size of all files is bigger than openCostInBytes * 2 * nCPU.

For example, a small/medium table could be used as dimension table in huge query, this will be useful to reduce the time waiting for broadcast.

## How was this patch tested?

Existing tests.

Author: Davies Liu <davies@databricks.com>

Closes #12344 from davies/more_partition.
---
 .../sql/execution/datasources/FileSourceStrategy.scala     | 7 ++++++-
 .../execution/datasources/FileSourceStrategySuite.scala    | 3 +++
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index ee48a7b81d..c1a97de72f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -134,8 +134,13 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
           }
 
         case _ =>
-          val maxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes
+          val defaultMaxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes
           val openCostInBytes = files.sqlContext.conf.filesOpenCostInBytes
+          val defaultParallelism = files.sqlContext.sparkContext.defaultParallelism
+          val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
+          val bytesPerCore = totalBytes / defaultParallelism
+          val maxSplitBytes = Math.min(defaultMaxSplitBytes,
+            Math.max(openCostInBytes, bytesPerCore))
           logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
             s"open cost is considered as scanning $openCostInBytes bytes.")
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 4699c48c72..50cd03a40c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import org.apache.hadoop.fs.{BlockLocation, FileStatus, RawLocalFileSystem}
 import org.apache.hadoop.mapreduce.Job
 
+import org.apache.spark.SparkConf
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper}
@@ -38,6 +39,8 @@ import org.apache.spark.util.Utils
 class FileSourceStrategySuite extends QueryTest with SharedSQLContext with PredicateHelper {
   import testImplicits._
 
+  protected override val sparkConf = new SparkConf().set("spark.default.parallelism", "1")
+
   test("unpartitioned table, single partition") {
     val table =
       createTable(
-- 
GitLab