Skip to content
Snippets Groups Projects
Commit 5028a001 authored by Kevin Yu's avatar Kevin Yu Committed by Reynold Xin
Browse files

[SPARK-12317][SQL] Support units (m,k,g) in SQLConf

This PR is continue from previous closed PR 10314.

In this PR, SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE will be taken memory string conventions as input.

For example, the user can now specify 10g for SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE in SQLConf file.

marmbrus srowen : Can you help review this code changes ? Thanks.

Author: Kevin Yu <qyu@us.ibm.com>

Closes #10629 from kevinyu98/spark-12317.
parent 28e0e500
No related branches found
No related tags found
No related merge requests found
...@@ -26,6 +26,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter ...@@ -26,6 +26,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.parser.ParserConf import org.apache.spark.sql.catalyst.parser.ParserConf
import org.apache.spark.util.Utils
//////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines the configuration options for Spark SQL. // This file defines the configuration options for Spark SQL.
...@@ -115,6 +116,25 @@ private[spark] object SQLConf { ...@@ -115,6 +116,25 @@ private[spark] object SQLConf {
} }
}, _.toString, doc, isPublic) }, _.toString, doc, isPublic)
def longMemConf(
key: String,
defaultValue: Option[Long] = None,
doc: String = "",
isPublic: Boolean = true): SQLConfEntry[Long] =
SQLConfEntry(key, defaultValue, { v =>
try {
v.toLong
} catch {
case _: NumberFormatException =>
try {
Utils.byteStringAsBytes(v)
} catch {
case _: NumberFormatException =>
throw new IllegalArgumentException(s"$key should be long, but was $v")
}
}
}, _.toString, doc, isPublic)
def doubleConf( def doubleConf(
key: String, key: String,
defaultValue: Option[Double] = None, defaultValue: Option[Double] = None,
...@@ -235,7 +255,7 @@ private[spark] object SQLConf { ...@@ -235,7 +255,7 @@ private[spark] object SQLConf {
doc = "The default number of partitions to use when shuffling data for joins or aggregations.") doc = "The default number of partitions to use when shuffling data for joins or aggregations.")
val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE = val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE =
longConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", longMemConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize",
defaultValue = Some(64 * 1024 * 1024), defaultValue = Some(64 * 1024 * 1024),
doc = "The target post-shuffle input size in bytes of a task.") doc = "The target post-shuffle input size in bytes of a task.")
......
...@@ -92,4 +92,43 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { ...@@ -92,4 +92,43 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
} }
assert(e.getMessage === s"${SQLConf.CASE_SENSITIVE.key} should be boolean, but was 10") assert(e.getMessage === s"${SQLConf.CASE_SENSITIVE.key} should be boolean, but was 10")
} }
test("Test SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE's method") {
sqlContext.conf.clear()
sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "100")
assert(sqlContext.conf.targetPostShuffleInputSize === 100)
sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1k")
assert(sqlContext.conf.targetPostShuffleInputSize === 1024)
sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1M")
assert(sqlContext.conf.targetPostShuffleInputSize === 1048576)
sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1g")
assert(sqlContext.conf.targetPostShuffleInputSize === 1073741824)
sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-1")
assert(sqlContext.conf.targetPostShuffleInputSize === -1)
// Test overflow exception
intercept[IllegalArgumentException] {
// This value exceeds Long.MaxValue
// Utils.byteStringAsBytes("90000000000g")
sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "90000000000g")
}
intercept[IllegalArgumentException] {
// This value less than Int.MinValue
// Utils.byteStringAsBytes("-90000000000g")
sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-90000000000g")
}
// Test invalid input
intercept[IllegalArgumentException] {
// This value exceeds Long.MaxValue
// Utils.byteStringAsBytes("-1g")
sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-1g")
}
sqlContext.conf.clear()
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment