Skip to content
Snippets Groups Projects
Commit 37c199bb authored by Josh Rosen's avatar Josh Rosen
Browse files

Allow controlling number of splits in distinct().

parent 9f6efbf0
No related branches found
No related tags found
No related merge requests found
...@@ -168,7 +168,8 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial ...@@ -168,7 +168,8 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f)) def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
def distinct(): RDD[T] = map(x => (x, "")).reduceByKey((x, y) => x).map(_._1) def distinct(numSplits: Int = splits.size): RDD[T] =
map(x => (x, "")).reduceByKey((x, y) => x, numSplits).map(_._1)
def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] = def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] =
new SampledRDD(this, withReplacement, fraction, seed) new SampledRDD(this, withReplacement, fraction, seed)
......
...@@ -33,6 +33,8 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav ...@@ -33,6 +33,8 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
def distinct(): JavaDoubleRDD = fromRDD(srdd.distinct()) def distinct(): JavaDoubleRDD = fromRDD(srdd.distinct())
def distinct(numSplits: Int): JavaDoubleRDD = fromRDD(srdd.distinct(numSplits))
def filter(f: JFunction[Double, java.lang.Boolean]): JavaDoubleRDD = def filter(f: JFunction[Double, java.lang.Boolean]): JavaDoubleRDD =
fromRDD(srdd.filter(x => f(x).booleanValue())) fromRDD(srdd.filter(x => f(x).booleanValue()))
......
...@@ -40,6 +40,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif ...@@ -40,6 +40,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
def distinct(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct()) def distinct(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct())
def distinct(numSplits: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct(numSplits))
def filter(f: Function[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] = def filter(f: Function[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue())) new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue()))
......
...@@ -19,6 +19,8 @@ JavaRDDLike[T, JavaRDD[T]] { ...@@ -19,6 +19,8 @@ JavaRDDLike[T, JavaRDD[T]] {
def distinct(): JavaRDD[T] = wrapRDD(rdd.distinct()) def distinct(): JavaRDD[T] = wrapRDD(rdd.distinct())
def distinct(numSplits: Int): JavaRDD[T] = wrapRDD(rdd.distinct(numSplits))
def filter(f: JFunction[T, java.lang.Boolean]): JavaRDD[T] = def filter(f: JFunction[T, java.lang.Boolean]): JavaRDD[T] =
wrapRDD(rdd.filter((x => f(x).booleanValue()))) wrapRDD(rdd.filter((x => f(x).booleanValue())))
......
...@@ -147,6 +147,10 @@ The following tables list the transformations and actions currently supported (s ...@@ -147,6 +147,10 @@ The following tables list the transformations and actions currently supported (s
<td> <b>union</b>(<i>otherDataset</i>) </td> <td> <b>union</b>(<i>otherDataset</i>) </td>
<td> Return a new dataset that contains the union of the elements in the source dataset and the argument. </td> <td> Return a new dataset that contains the union of the elements in the source dataset and the argument. </td>
</tr> </tr>
<tr>
<td> <b>distinct</b>([<i>numTasks</i>])) </td>
<td> Return a new dataset that contains the distinct elements of the source dataset.</td>
</tr>
<tr> <tr>
<td> <b>groupByKey</b>([<i>numTasks</i>]) </td> <td> <b>groupByKey</b>([<i>numTasks</i>]) </td>
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Seq[V]) pairs. <br /> <td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Seq[V]) pairs. <br />
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment