Skip to content
Snippets Groups Projects
Commit f45efbb8 authored by Chandan Kumar's avatar Chandan Kumar Committed by Xiangrui Meng
Browse files

[SPARK-2862] histogram method fails on some choices of bucketCount

Author: Chandan Kumar <chandan.kumar@imaginea.com>

Closes #1787 from nrchandan/spark-2862 and squashes the following commits:

a76bbf6 [Chandan Kumar] [SPARK-2862] Fix for a broken test case and add new test cases
4211eea [Chandan Kumar] [SPARK-2862] Add Scala bug id
13854f1 [Chandan Kumar] [SPARK-2862] Use shorthand range notation to avoid Scala bug
parent c0cbbdea
No related branches found
No related tags found
No related merge requests found
...@@ -95,7 +95,12 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { ...@@ -95,7 +95,12 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
* If the elements in RDD do not vary (max == min) always returns a single bucket. * If the elements in RDD do not vary (max == min) always returns a single bucket.
*/ */
def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = { def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = {
// Compute the minimum and the maxium // Scala's built-in range has issues. See #SI-8782
def customRange(min: Double, max: Double, steps: Int): IndexedSeq[Double] = {
val span = max - min
Range.Int(0, steps, 1).map(s => min + (s * span) / steps) :+ max
}
// Compute the minimum and the maximum
val (max: Double, min: Double) = self.mapPartitions { items => val (max: Double, min: Double) = self.mapPartitions { items =>
Iterator(items.foldRight(Double.NegativeInfinity, Iterator(items.foldRight(Double.NegativeInfinity,
Double.PositiveInfinity)((e: Double, x: Pair[Double, Double]) => Double.PositiveInfinity)((e: Double, x: Pair[Double, Double]) =>
...@@ -107,9 +112,11 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { ...@@ -107,9 +112,11 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
"Histogram on either an empty RDD or RDD containing +/-infinity or NaN") "Histogram on either an empty RDD or RDD containing +/-infinity or NaN")
} }
val increment = (max-min)/bucketCount.toDouble val range = if (min != max) {
val range = if (increment != 0) { // Range.Double.inclusive(min, max, increment)
Range.Double.inclusive(min, max, increment) // The above code doesn't always work. See Scala bug #SI-8782.
// https://issues.scala-lang.org/browse/SI-8782
customRange(min, max, bucketCount)
} else { } else {
List(min, min) List(min, min)
} }
......
...@@ -245,6 +245,29 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext { ...@@ -245,6 +245,29 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
assert(histogramBuckets === expectedHistogramBuckets) assert(histogramBuckets === expectedHistogramBuckets)
} }
test("WorksWithoutBucketsForLargerDatasets") {
// Verify the case of slighly larger datasets
val rdd = sc.parallelize(6 to 99)
val (histogramBuckets, histogramResults) = rdd.histogram(8)
val expectedHistogramResults =
Array(12, 12, 11, 12, 12, 11, 12, 12)
val expectedHistogramBuckets =
Array(6.0, 17.625, 29.25, 40.875, 52.5, 64.125, 75.75, 87.375, 99.0)
assert(histogramResults === expectedHistogramResults)
assert(histogramBuckets === expectedHistogramBuckets)
}
test("WorksWithoutBucketsWithIrrationalBucketEdges") {
// Verify the case of buckets with irrational edges. See #SPARK-2862.
val rdd = sc.parallelize(6 to 99)
val (histogramBuckets, histogramResults) = rdd.histogram(9)
val expectedHistogramResults =
Array(11, 10, 11, 10, 10, 11, 10, 10, 11)
assert(histogramResults === expectedHistogramResults)
assert(histogramBuckets(0) === 6.0)
assert(histogramBuckets(9) === 99.0)
}
// Test the failure mode with an invalid RDD // Test the failure mode with an invalid RDD
test("ThrowsExceptionOnInvalidRDDs") { test("ThrowsExceptionOnInvalidRDDs") {
// infinity // infinity
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment