Skip to content
Snippets Groups Projects
Commit bd35385d authored by Davies Liu's avatar Davies Liu Committed by Davies Liu
Browse files

[SPARK-9945] [SQL] pageSize should be calculated from executor.memory

Currently, pageSize of TungstenSort is calculated from driver.memory, it should use executor.memory instead.

Also, in the worst case, the safeFactor could be 4 (because of rounding), increase it to 16.

cc rxin

Author: Davies Liu <davies@databricks.com>

Closes #8175 from davies/page_size.
parent 8187b3ae
No related branches found
No related tags found
No related merge requests found
......@@ -175,7 +175,9 @@ private[spark] object ShuffleMemoryManager {
val minPageSize = 1L * 1024 * 1024 // 1MB
val maxPageSize = 64L * minPageSize // 64MB
val cores = if (numCores > 0) numCores else Runtime.getRuntime.availableProcessors()
val safetyFactor = 8
// Because of rounding to next power of 2, we may have safetyFactor as 8 in worst case
val safetyFactor = 16
// TODO(davies): don't round to next power of 2
val size = ByteArrayMethods.nextPowerOf2(maxMemory / cores / safetyFactor)
val default = math.min(maxPageSize, math.max(minPageSize, size))
conf.getSizeAsBytes("spark.buffer.pageSize", default)
......
......@@ -17,15 +17,15 @@
package org.apache.spark.sql.execution
import org.apache.spark.{SparkEnv, InternalAccumulator, TaskContext}
import org.apache.spark.rdd.{MapPartitionsWithPreparationRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{UnspecifiedDistribution, OrderedDistribution, Distribution}
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, UnspecifiedDistribution}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.collection.ExternalSorter
import org.apache.spark.{SparkEnv, InternalAccumulator, TaskContext}
////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines various sort operators.
......@@ -122,7 +122,6 @@ case class TungstenSort(
protected override def doExecute(): RDD[InternalRow] = {
val schema = child.schema
val childOutput = child.output
val pageSize = SparkEnv.get.shuffleMemoryManager.pageSizeBytes
/**
* Set up the sorter in each partition before computing the parent partition.
......@@ -143,6 +142,7 @@ case class TungstenSort(
}
}
val pageSize = SparkEnv.get.shuffleMemoryManager.pageSizeBytes
val sorter = new UnsafeExternalRowSorter(
schema, ordering, prefixComparator, prefixComputer, pageSize)
if (testSpillFrequency > 0) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment