Skip to content
Snippets Groups Projects
Commit de21ca46 authored by wangzhenhua's avatar wangzhenhua Committed by Reynold Xin
Browse files

[SPARK-18815][SQL] Fix NPE when collecting column stats for string/binary...

[SPARK-18815][SQL] Fix NPE when collecting column stats for string/binary column having only null values

## What changes were proposed in this pull request?

During column stats collection, average and max length will be null if a column of string/binary type has only null values. To fix this, I use default size when avg/max length is null.

## How was this patch tested?

Add a test for handling null columns

Author: wangzhenhua <wangzhenhua@huawei.com>

Closes #16243 from wzhfy/nullStats.

(cherry picked from commit a29ee55a)
Signed-off-by: default avatarReynold Xin <rxin@databricks.com>
parent 5151dafa
No related branches found
No related tags found
No related merge requests found
......@@ -194,11 +194,12 @@ object ColumnStat extends Logging {
val numNonNulls = if (col.nullable) Count(col) else Count(one)
val ndv = Least(Seq(HyperLogLogPlusPlus(col, relativeSD), numNonNulls))
val numNulls = Subtract(Count(one), numNonNulls)
val defaultSize = Literal(col.dataType.defaultSize, LongType)
def fixedLenTypeStruct(castType: DataType) = {
// For fixed width types, avg size should be the same as max size.
val avgSize = Literal(col.dataType.defaultSize, LongType)
struct(ndv, Cast(Min(col), castType), Cast(Max(col), castType), numNulls, avgSize, avgSize)
struct(ndv, Cast(Min(col), castType), Cast(Max(col), castType), numNulls, defaultSize,
defaultSize)
}
col.dataType match {
......@@ -213,7 +214,9 @@ object ColumnStat extends Logging {
val nullLit = Literal(null, col.dataType)
struct(
ndv, nullLit, nullLit, numNulls,
Ceil(Average(Length(col))), Cast(Max(Length(col)), LongType))
// Set avg/max size to default size if all the values are null or there is no value.
Coalesce(Seq(Ceil(Average(Length(col))), defaultSize)),
Coalesce(Seq(Cast(Max(Length(col)), LongType), defaultSize)))
case _ =>
throw new AnalysisException("Analyzing column statistics is not supported for column " +
s"${col.name} of data type: ${col.dataType}.")
......
......@@ -21,6 +21,7 @@ import java.{lang => jl}
import java.sql.{Date, Timestamp}
import scala.collection.mutable
import scala.util.Random
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical._
......@@ -133,6 +134,40 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
}
}
test("column stats round trip serialization") {
// Make sure we serialize and then deserialize and we will get the result data
val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
stats.zip(df.schema).foreach { case ((k, v), field) =>
withClue(s"column $k with type ${field.dataType}") {
val roundtrip = ColumnStat.fromMap("table_is_foo", field, v.toMap)
assert(roundtrip == Some(v))
}
}
}
test("analyze column command - result verification") {
// (data.head.productArity - 1) because the last column does not support stats collection.
assert(stats.size == data.head.productArity - 1)
val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
checkColStats(df, stats)
}
test("column stats collection for null columns") {
val dataTypes: Seq[(DataType, Int)] = Seq(
BooleanType, ByteType, ShortType, IntegerType, LongType,
DoubleType, FloatType, DecimalType.SYSTEM_DEFAULT,
StringType, BinaryType, DateType, TimestampType
).zipWithIndex
val df = sql("select " + dataTypes.map { case (tpe, idx) =>
s"cast(null as ${tpe.sql}) as col$idx"
}.mkString(", "))
val expectedColStats = dataTypes.map { case (tpe, idx) =>
(s"col$idx", ColumnStat(0, None, None, 1, tpe.defaultSize.toLong, tpe.defaultSize.toLong))
}
checkColStats(df, mutable.LinkedHashMap(expectedColStats: _*))
}
}
......@@ -141,7 +176,6 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
* when using the Hive external catalog) as well as in the sql/core module.
*/
abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils {
import testImplicits._
private val dec1 = new java.math.BigDecimal("1.000000000000000000")
private val dec2 = new java.math.BigDecimal("8.000000000000000000")
......@@ -180,35 +214,28 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
"ctimestamp" -> ColumnStat(2, Some(t1), Some(t2), 1, 8, 8)
)
test("column stats round trip serialization") {
// Make sure we serialize and then deserialize and we will get the result data
val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
stats.zip(df.schema).foreach { case ((k, v), field) =>
withClue(s"column $k with type ${field.dataType}") {
val roundtrip = ColumnStat.fromMap("table_is_foo", field, v.toMap)
assert(roundtrip == Some(v))
}
}
}
test("analyze column command - result verification") {
val tableName = "column_stats_test2"
// (data.head.productArity - 1) because the last column does not support stats collection.
assert(stats.size == data.head.productArity - 1)
val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
private val randomName = new Random(31)
/**
* Compute column stats for the given DataFrame and compare it with colStats.
*/
def checkColStats(
df: DataFrame,
colStats: mutable.LinkedHashMap[String, ColumnStat]): Unit = {
val tableName = "column_stats_test_" + randomName.nextInt(1000)
withTable(tableName) {
df.write.saveAsTable(tableName)
// Collect statistics
sql(s"analyze table $tableName compute STATISTICS FOR COLUMNS " + stats.keys.mkString(", "))
sql(s"analyze table $tableName compute STATISTICS FOR COLUMNS " +
colStats.keys.mkString(", "))
// Validate statistics
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
assert(table.stats.isDefined)
assert(table.stats.get.colStats.size == stats.size)
assert(table.stats.get.colStats.size == colStats.size)
stats.foreach { case (k, v) =>
colStats.foreach { case (k, v) =>
withClue(s"column $k") {
assert(table.stats.get.colStats(k) == v)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment