Skip to content
Snippets Groups Projects
Commit 61be8566 authored by Mark Hamstra's avatar Mark Hamstra
Browse files

Allow distinct() to be called without parentheses when using the default number of splits.

parent a6bb41c6
No related branches found
No related tags found
No related merge requests found
...@@ -185,9 +185,11 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial ...@@ -185,9 +185,11 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
/** /**
* Return a new RDD containing the distinct elements in this RDD. * Return a new RDD containing the distinct elements in this RDD.
*/ */
def distinct(numSplits: Int = splits.size): RDD[T] = def distinct(numSplits: Int): RDD[T] =
map(x => (x, null)).reduceByKey((x, y) => x, numSplits).map(_._1) map(x => (x, null)).reduceByKey((x, y) => x, numSplits).map(_._1)
def distinct(): RDD[T] = distinct(splits.size)
/** /**
* Return a sampled subset of this RDD. * Return a sampled subset of this RDD.
*/ */
......
...@@ -8,9 +8,9 @@ import spark.rdd.CoalescedRDD ...@@ -8,9 +8,9 @@ import spark.rdd.CoalescedRDD
import SparkContext._ import SparkContext._
class RDDSuite extends FunSuite with BeforeAndAfter { class RDDSuite extends FunSuite with BeforeAndAfter {
var sc: SparkContext = _ var sc: SparkContext = _
after { after {
if (sc != null) { if (sc != null) {
sc.stop() sc.stop()
...@@ -19,11 +19,15 @@ class RDDSuite extends FunSuite with BeforeAndAfter { ...@@ -19,11 +19,15 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.master.port") System.clearProperty("spark.master.port")
} }
test("basic operations") { test("basic operations") {
sc = new SparkContext("local", "test") sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
assert(nums.collect().toList === List(1, 2, 3, 4)) assert(nums.collect().toList === List(1, 2, 3, 4))
val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2)
assert(dups.distinct.count === 4)
assert(dups.distinct().collect === dups.distinct.collect)
assert(dups.distinct(2).collect === dups.distinct.collect)
assert(nums.reduce(_ + _) === 10) assert(nums.reduce(_ + _) === 10)
assert(nums.fold(0)(_ + _) === 10) assert(nums.fold(0)(_ + _) === 10)
assert(nums.map(_.toString).collect().toList === List("1", "2", "3", "4")) assert(nums.map(_.toString).collect().toList === List("1", "2", "3", "4"))
...@@ -121,7 +125,7 @@ class RDDSuite extends FunSuite with BeforeAndAfter { ...@@ -121,7 +125,7 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
val zipped = nums.zip(nums.map(_ + 1.0)) val zipped = nums.zip(nums.map(_ + 1.0))
assert(zipped.glom().map(_.toList).collect().toList === assert(zipped.glom().map(_.toList).collect().toList ===
List(List((1, 2.0), (2, 3.0)), List((3, 4.0), (4, 5.0)))) List(List((1, 2.0), (2, 3.0)), List((3, 4.0), (4, 5.0))))
intercept[IllegalArgumentException] { intercept[IllegalArgumentException] {
nums.zip(sc.parallelize(1 to 4, 1)).collect() nums.zip(sc.parallelize(1 to 4, 1)).collect()
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment