Skip to content
Snippets Groups Projects
Commit c5172568 authored by Huaxin Gao's avatar Huaxin Gao Committed by Wenchen Fan
Browse files

[SPARK-17460][SQL] Make sure sizeInBytes in Statistics will not overflow

## What changes were proposed in this pull request?

1. In SparkStrategies.canBroadcast, I will add the check   plan.statistics.sizeInBytes >= 0
2. In LocalRelations.statistics, when calculate the statistics, I will change the size to BigInt so it won't overflow.

## How was this patch tested?

I will add a test case to make sure the statistics.sizeInBytes won't overflow.

Author: Huaxin Gao <huaxing@us.ibm.com>

Closes #16175 from huaxingao/spark-17460.
parent 63c91598
No related branches found
No related tags found
No related merge requests found
......@@ -75,7 +75,8 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil)
}
override lazy val statistics =
Statistics(sizeInBytes = output.map(_.dataType.defaultSize).sum * data.length)
Statistics(sizeInBytes =
(output.map(n => BigInt(n.dataType.defaultSize))).sum * data.length)
def toSQL(inlineTableName: String): String = {
require(data.nonEmpty)
......
......@@ -115,7 +115,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
*/
private def canBroadcast(plan: LogicalPlan): Boolean = {
plan.statistics.isBroadcastable ||
plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold
(plan.statistics.sizeInBytes >= 0 &&
plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold)
}
/**
......
......@@ -136,7 +136,7 @@ object SQLConf {
"That is to say by default the optimizer will not choose to broadcast a table unless it " +
"knows for sure its size is small enough.")
.longConf
.createWithDefault(-1)
.createWithDefault(Long.MaxValue)
val SHUFFLE_PARTITIONS = SQLConfigBuilder("spark.sql.shuffle.partitions")
.doc("The default number of partitions to use when shuffling data for joins or aggregations.")
......@@ -764,7 +764,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED)
def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES, Long.MaxValue)
def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES)
def isParquetSchemaMergingEnabled: Boolean = getConf(PARQUET_SCHEMA_MERGING_ENABLED)
......
......@@ -1110,6 +1110,16 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
}
assert(e.getMessage.contains("Cannot create encoder for Option of Product type"))
}
test ("SPARK-17460: the sizeInBytes in Statistics shouldn't overflow to a negative number") {
// Since the sizeInBytes in Statistics could exceed the limit of an Int, we should use BigInt
// instead of Int for avoiding possible overflow.
val ds = (0 to 10000).map( i =>
(i, Seq((i, Seq((i, "This is really not that long of a string")))))).toDS()
val sizeInBytes = ds.logicalPlan.statistics.sizeInBytes
// sizeInBytes is 2404280404, before the fix, it overflows to a negative number
assert(sizeInBytes > 0)
}
}
case class Generic[T](id: T, value: Double)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment