Skip to content
Snippets Groups Projects
Commit 99ecfa59 authored by vinodkc's avatar vinodkc Committed by Andrew Or
Browse files

[SPARK-10575] [SPARK CORE] Wrapped RDD.takeSample with Scope

Remove return statements in RDD.takeSample and wrap it withScope

Author: vinodkc <vinod.kc.in@gmail.com>
Author: vinodkc <vinodkc@users.noreply.github.com>
Author: Vinod K C <vinod.kc@huawei.com>

Closes #8730 from vinodkc/fix_takesample_return.
parent a63cdc76
No related branches found
No related tags found
No related merge requests found
...@@ -469,50 +469,44 @@ abstract class RDD[T: ClassTag]( ...@@ -469,50 +469,44 @@ abstract class RDD[T: ClassTag](
* @param seed seed for the random number generator * @param seed seed for the random number generator
* @return sample of specified size in an array * @return sample of specified size in an array
*/ */
// TODO: rewrite this without return statements so we can wrap it in a scope
def takeSample( def takeSample(
withReplacement: Boolean, withReplacement: Boolean,
num: Int, num: Int,
seed: Long = Utils.random.nextLong): Array[T] = { seed: Long = Utils.random.nextLong): Array[T] = withScope {
val numStDev = 10.0 val numStDev = 10.0
if (num < 0) { require(num >= 0, "Negative number of elements requested")
throw new IllegalArgumentException("Negative number of elements requested") require(num <= (Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt),
} else if (num == 0) { "Cannot support a sample size > Int.MaxValue - " +
return new Array[T](0) s"$numStDev * math.sqrt(Int.MaxValue)")
}
val initialCount = this.count()
if (initialCount == 0) {
return new Array[T](0)
}
val maxSampleSize = Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt
if (num > maxSampleSize) {
throw new IllegalArgumentException("Cannot support a sample size > Int.MaxValue - " +
s"$numStDev * math.sqrt(Int.MaxValue)")
}
val rand = new Random(seed)
if (!withReplacement && num >= initialCount) {
return Utils.randomizeInPlace(this.collect(), rand)
}
val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,
withReplacement)
var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
// If the first sample didn't turn out large enough, keep trying to take samples; if (num == 0) {
// this shouldn't happen often because we use a big multiplier for the initial size new Array[T](0)
var numIters = 0 } else {
while (samples.length < num) { val initialCount = this.count()
logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters") if (initialCount == 0) {
samples = this.sample(withReplacement, fraction, rand.nextInt()).collect() new Array[T](0)
numIters += 1 } else {
val rand = new Random(seed)
if (!withReplacement && num >= initialCount) {
Utils.randomizeInPlace(this.collect(), rand)
} else {
val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,
withReplacement)
var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
// If the first sample didn't turn out large enough, keep trying to take samples;
// this shouldn't happen often because we use a big multiplier for the initial size
var numIters = 0
while (samples.length < num) {
logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")
samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
numIters += 1
}
Utils.randomizeInPlace(samples, rand).take(num)
}
}
} }
Utils.randomizeInPlace(samples, rand).take(num)
} }
/** /**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment