Skip to content
Snippets Groups Projects
Commit c25b97fc authored by Davies Liu's avatar Davies Liu Committed by Reynold Xin
Browse files

[SPARK-14582][SQL] increase parallelism for small tables

## What changes were proposed in this pull request?

This PR try to increase the parallelism for small table (a few of big files) to reduce the query time, by decrease the maxSplitBytes, the goal is to have at least one task per CPU in the cluster, if the total size of all files is bigger than openCostInBytes * 2 * nCPU.

For example, a small/medium table could be used as dimension table in huge query, this will be useful to reduce the time waiting for broadcast.

## How was this patch tested?

Existing tests.

Author: Davies Liu <davies@databricks.com>

Closes #12344 from davies/more_partition.
parent fde1340c
No related branches found
No related tags found
No related merge requests found
...@@ -134,8 +134,13 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { ...@@ -134,8 +134,13 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
} }
case _ => case _ =>
val maxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes val defaultMaxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes
val openCostInBytes = files.sqlContext.conf.filesOpenCostInBytes val openCostInBytes = files.sqlContext.conf.filesOpenCostInBytes
val defaultParallelism = files.sqlContext.sparkContext.defaultParallelism
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitBytes = Math.min(defaultMaxSplitBytes,
Math.max(openCostInBytes, bytesPerCore))
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.") s"open cost is considered as scanning $openCostInBytes bytes.")
......
...@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger ...@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.hadoop.fs.{BlockLocation, FileStatus, RawLocalFileSystem} import org.apache.hadoop.fs.{BlockLocation, FileStatus, RawLocalFileSystem}
import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkConf
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper} import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper}
...@@ -38,6 +39,8 @@ import org.apache.spark.util.Utils ...@@ -38,6 +39,8 @@ import org.apache.spark.util.Utils
class FileSourceStrategySuite extends QueryTest with SharedSQLContext with PredicateHelper { class FileSourceStrategySuite extends QueryTest with SharedSQLContext with PredicateHelper {
import testImplicits._ import testImplicits._
protected override val sparkConf = new SparkConf().set("spark.default.parallelism", "1")
test("unpartitioned table, single partition") { test("unpartitioned table, single partition") {
val table = val table =
createTable( createTable(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment