Skip to content
Snippets Groups Projects
Commit da9f9e05 authored by Kan Zhang's avatar Kan Zhang Committed by Patrick Wendell
Browse files

[SPARK-1460] Returning SchemaRDD instead of normal RDD on Set operations...


... that do not change schema

Author: Kan Zhang <kzhang@apache.org>

Closes #448 from kanzhang/SPARK-1460 and squashes the following commits:

111e388 [Kan Zhang] silence MiMa errors in EdgeRDD and VertexRDD
91dc787 [Kan Zhang] Taking into account newly added Ordering param
79ed52a [Kan Zhang] [SPARK-1460] Returning SchemaRDD on Set operations that do not change schema
(cherry picked from commit 967635a2)

Signed-off-by: default avatarPatrick Wendell <pwendell@gmail.com>
parent 756c9693
No related branches found
No related tags found
No related merge requests found
...@@ -128,7 +128,7 @@ abstract class RDD[T: ClassTag]( ...@@ -128,7 +128,7 @@ abstract class RDD[T: ClassTag](
@transient var name: String = null @transient var name: String = null
/** Assign a name to this RDD */ /** Assign a name to this RDD */
def setName(_name: String): RDD[T] = { def setName(_name: String): this.type = {
name = _name name = _name
this this
} }
...@@ -138,7 +138,7 @@ abstract class RDD[T: ClassTag]( ...@@ -138,7 +138,7 @@ abstract class RDD[T: ClassTag](
* it is computed. This can only be used to assign a new storage level if the RDD does not * it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet.. * have a storage level set yet..
*/ */
def persist(newLevel: StorageLevel): RDD[T] = { def persist(newLevel: StorageLevel): this.type = {
// TODO: Handle changes of StorageLevel // TODO: Handle changes of StorageLevel
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) { if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
...@@ -152,10 +152,10 @@ abstract class RDD[T: ClassTag]( ...@@ -152,10 +152,10 @@ abstract class RDD[T: ClassTag](
} }
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY) def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): RDD[T] = persist() def cache(): this.type = persist()
/** /**
* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
...@@ -163,7 +163,7 @@ abstract class RDD[T: ClassTag]( ...@@ -163,7 +163,7 @@ abstract class RDD[T: ClassTag](
* @param blocking Whether to block until all blocks are deleted. * @param blocking Whether to block until all blocks are deleted.
* @return This RDD. * @return This RDD.
*/ */
def unpersist(blocking: Boolean = true): RDD[T] = { def unpersist(blocking: Boolean = true): this.type = {
logInfo("Removing RDD " + id + " from persistence list") logInfo("Removing RDD " + id + " from persistence list")
sc.unpersistRDD(id, blocking) sc.unpersistRDD(id, blocking)
storageLevel = StorageLevel.NONE storageLevel = StorageLevel.NONE
......
...@@ -51,18 +51,12 @@ class EdgeRDD[@specialized ED: ClassTag]( ...@@ -51,18 +51,12 @@ class EdgeRDD[@specialized ED: ClassTag](
override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect() override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
override def persist(newLevel: StorageLevel): EdgeRDD[ED] = { override def persist(newLevel: StorageLevel): this.type = {
partitionsRDD.persist(newLevel) partitionsRDD.persist(newLevel)
this this
} }
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ override def unpersist(blocking: Boolean = true): this.type = {
override def persist(): EdgeRDD[ED] = persist(StorageLevel.MEMORY_ONLY)
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def cache(): EdgeRDD[ED] = persist()
override def unpersist(blocking: Boolean = true): EdgeRDD[ED] = {
partitionsRDD.unpersist(blocking) partitionsRDD.unpersist(blocking)
this this
} }
......
...@@ -71,18 +71,12 @@ class VertexRDD[@specialized VD: ClassTag]( ...@@ -71,18 +71,12 @@ class VertexRDD[@specialized VD: ClassTag](
override protected def getPreferredLocations(s: Partition): Seq[String] = override protected def getPreferredLocations(s: Partition): Seq[String] =
partitionsRDD.preferredLocations(s) partitionsRDD.preferredLocations(s)
override def persist(newLevel: StorageLevel): VertexRDD[VD] = { override def persist(newLevel: StorageLevel): this.type = {
partitionsRDD.persist(newLevel) partitionsRDD.persist(newLevel)
this this
} }
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ override def unpersist(blocking: Boolean = true): this.type = {
override def persist(): VertexRDD[VD] = persist(StorageLevel.MEMORY_ONLY)
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def cache(): VertexRDD[VD] = persist()
override def unpersist(blocking: Boolean = true): VertexRDD[VD] = {
partitionsRDD.unpersist(blocking) partitionsRDD.unpersist(blocking)
this this
} }
......
...@@ -74,6 +74,8 @@ object MimaBuild { ...@@ -74,6 +74,8 @@ object MimaBuild {
) ++ ) ++
excludeSparkClass("rdd.ClassTags") ++ excludeSparkClass("rdd.ClassTags") ++
excludeSparkClass("util.XORShiftRandom") ++ excludeSparkClass("util.XORShiftRandom") ++
excludeSparkClass("graphx.EdgeRDD") ++
excludeSparkClass("graphx.VertexRDD") ++
excludeSparkClass("mllib.recommendation.MFDataGenerator") ++ excludeSparkClass("mllib.recommendation.MFDataGenerator") ++
excludeSparkClass("mllib.optimization.SquaredGradient") ++ excludeSparkClass("mllib.optimization.SquaredGradient") ++
excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++ excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++
......
...@@ -360,6 +360,35 @@ class SchemaRDD(RDD): ...@@ -360,6 +360,35 @@ class SchemaRDD(RDD):
else: else:
return None return None
def coalesce(self, numPartitions, shuffle=False):
rdd = self._jschema_rdd.coalesce(numPartitions, shuffle)
return SchemaRDD(rdd, self.sql_ctx)
def distinct(self):
rdd = self._jschema_rdd.distinct()
return SchemaRDD(rdd, self.sql_ctx)
def intersection(self, other):
if (other.__class__ is SchemaRDD):
rdd = self._jschema_rdd.intersection(other._jschema_rdd)
return SchemaRDD(rdd, self.sql_ctx)
else:
raise ValueError("Can only intersect with another SchemaRDD")
def repartition(self, numPartitions):
rdd = self._jschema_rdd.repartition(numPartitions)
return SchemaRDD(rdd, self.sql_ctx)
def subtract(self, other, numPartitions=None):
if (other.__class__ is SchemaRDD):
if numPartitions is None:
rdd = self._jschema_rdd.subtract(other._jschema_rdd)
else:
rdd = self._jschema_rdd.subtract(other._jschema_rdd, numPartitions)
return SchemaRDD(rdd, self.sql_ctx)
else:
raise ValueError("Can only subtract another SchemaRDD")
def _test(): def _test():
import doctest import doctest
from pyspark.context import SparkContext from pyspark.context import SparkContext
......
...@@ -19,14 +19,16 @@ package org.apache.spark.sql ...@@ -19,14 +19,16 @@ package org.apache.spark.sql
import net.razorvine.pickle.Pickler import net.razorvine.pickle.Pickler
import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext} import org.apache.spark.{Dependency, OneToOneDependency, Partition, Partitioner, TaskContext}
import org.apache.spark.annotation.{AlphaComponent, Experimental} import org.apache.spark.annotation.{AlphaComponent, Experimental}
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.api.java.JavaSchemaRDD
import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.types.BooleanType import org.apache.spark.sql.catalyst.types.BooleanType
import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
import org.apache.spark.api.java.JavaRDD import org.apache.spark.api.java.JavaRDD
import java.util.{Map => JMap} import java.util.{Map => JMap}
...@@ -296,6 +298,13 @@ class SchemaRDD( ...@@ -296,6 +298,13 @@ class SchemaRDD(
*/ */
def toSchemaRDD = this def toSchemaRDD = this
/**
* Returns this RDD as a JavaSchemaRDD.
*
* @group schema
*/
def toJavaSchemaRDD: JavaSchemaRDD = new JavaSchemaRDD(sqlContext, logicalPlan)
private[sql] def javaToPython: JavaRDD[Array[Byte]] = { private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name) val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name)
this.mapPartitions { iter => this.mapPartitions { iter =>
...@@ -314,4 +323,60 @@ class SchemaRDD( ...@@ -314,4 +323,60 @@ class SchemaRDD(
} }
} }
} }
/**
* Creates SchemaRDD by applying own schema to derived RDD. Typically used to wrap return value
* of base RDD functions that do not change schema.
*
* @param rdd RDD derived from this one and has same schema
*
* @group schema
*/
private def applySchema(rdd: RDD[Row]): SchemaRDD = {
new SchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(logicalPlan.output, rdd)))
}
// =======================================================================
// Base RDD functions that do NOT change schema
// =======================================================================
// Transformations (return a new RDD)
override def coalesce(numPartitions: Int, shuffle: Boolean = false)
(implicit ord: Ordering[Row] = null): SchemaRDD =
applySchema(super.coalesce(numPartitions, shuffle)(ord))
override def distinct(): SchemaRDD =
applySchema(super.distinct())
override def distinct(numPartitions: Int)
(implicit ord: Ordering[Row] = null): SchemaRDD =
applySchema(super.distinct(numPartitions)(ord))
override def filter(f: Row => Boolean): SchemaRDD =
applySchema(super.filter(f))
override def intersection(other: RDD[Row]): SchemaRDD =
applySchema(super.intersection(other))
override def intersection(other: RDD[Row], partitioner: Partitioner)
(implicit ord: Ordering[Row] = null): SchemaRDD =
applySchema(super.intersection(other, partitioner)(ord))
override def intersection(other: RDD[Row], numPartitions: Int): SchemaRDD =
applySchema(super.intersection(other, numPartitions))
override def repartition(numPartitions: Int)
(implicit ord: Ordering[Row] = null): SchemaRDD =
applySchema(super.repartition(numPartitions)(ord))
override def subtract(other: RDD[Row]): SchemaRDD =
applySchema(super.subtract(other))
override def subtract(other: RDD[Row], numPartitions: Int): SchemaRDD =
applySchema(super.subtract(other, numPartitions))
override def subtract(other: RDD[Row], p: Partitioner)
(implicit ord: Ordering[Row] = null): SchemaRDD =
applySchema(super.subtract(other, p)(ord))
} }
...@@ -17,10 +17,13 @@ ...@@ -17,10 +17,13 @@
package org.apache.spark.sql.api.java package org.apache.spark.sql.api.java
import org.apache.spark.Partitioner
import org.apache.spark.api.java.{JavaRDDLike, JavaRDD} import org.apache.spark.api.java.{JavaRDDLike, JavaRDD}
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.sql.{SQLContext, SchemaRDD, SchemaRDDLike} import org.apache.spark.sql.{SQLContext, SchemaRDD, SchemaRDDLike}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
/** /**
* An RDD of [[Row]] objects that is returned as the result of a Spark SQL query. In addition to * An RDD of [[Row]] objects that is returned as the result of a Spark SQL query. In addition to
...@@ -45,4 +48,141 @@ class JavaSchemaRDD( ...@@ -45,4 +48,141 @@ class JavaSchemaRDD(
override def wrapRDD(rdd: RDD[Row]): JavaRDD[Row] = JavaRDD.fromRDD(rdd) override def wrapRDD(rdd: RDD[Row]): JavaRDD[Row] = JavaRDD.fromRDD(rdd)
val rdd = baseSchemaRDD.map(new Row(_)) val rdd = baseSchemaRDD.map(new Row(_))
override def toString: String = baseSchemaRDD.toString
// =======================================================================
// Base RDD functions that do NOT change schema
// =======================================================================
// Common RDD functions
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): JavaSchemaRDD = {
baseSchemaRDD.cache()
this
}
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): JavaSchemaRDD = {
baseSchemaRDD.persist()
this
}
/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet..
*/
def persist(newLevel: StorageLevel): JavaSchemaRDD = {
baseSchemaRDD.persist(newLevel)
this
}
/**
* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
*
* @param blocking Whether to block until all blocks are deleted.
* @return This RDD.
*/
def unpersist(blocking: Boolean = true): JavaSchemaRDD = {
baseSchemaRDD.unpersist(blocking)
this
}
/** Assign a name to this RDD */
def setName(name: String): JavaSchemaRDD = {
baseSchemaRDD.setName(name)
this
}
// Transformations (return a new RDD)
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false): JavaSchemaRDD =
baseSchemaRDD.coalesce(numPartitions, shuffle).toJavaSchemaRDD
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(): JavaSchemaRDD =
baseSchemaRDD.distinct().toJavaSchemaRDD
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(numPartitions: Int): JavaSchemaRDD =
baseSchemaRDD.distinct(numPartitions).toJavaSchemaRDD
/**
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: JFunction[Row, java.lang.Boolean]): JavaSchemaRDD =
baseSchemaRDD.filter(x => f.call(new Row(x)).booleanValue()).toJavaSchemaRDD
/**
* Return the intersection of this RDD and another one. The output will not contain any
* duplicate elements, even if the input RDDs did.
*
* Note that this method performs a shuffle internally.
*/
def intersection(other: JavaSchemaRDD): JavaSchemaRDD =
this.baseSchemaRDD.intersection(other.baseSchemaRDD).toJavaSchemaRDD
/**
* Return the intersection of this RDD and another one. The output will not contain any
* duplicate elements, even if the input RDDs did.
*
* Note that this method performs a shuffle internally.
*
* @param partitioner Partitioner to use for the resulting RDD
*/
def intersection(other: JavaSchemaRDD, partitioner: Partitioner): JavaSchemaRDD =
this.baseSchemaRDD.intersection(other.baseSchemaRDD, partitioner).toJavaSchemaRDD
/**
* Return the intersection of this RDD and another one. The output will not contain any
* duplicate elements, even if the input RDDs did. Performs a hash partition across the cluster
*
* Note that this method performs a shuffle internally.
*
* @param numPartitions How many partitions to use in the resulting RDD
*/
def intersection(other: JavaSchemaRDD, numPartitions: Int): JavaSchemaRDD =
this.baseSchemaRDD.intersection(other.baseSchemaRDD, numPartitions).toJavaSchemaRDD
/**
* Return a new RDD that has exactly `numPartitions` partitions.
*
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses
* a shuffle to redistribute data.
*
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.
*/
def repartition(numPartitions: Int): JavaSchemaRDD =
baseSchemaRDD.repartition(numPartitions).toJavaSchemaRDD
/**
* Return an RDD with the elements from `this` that are not in `other`.
*
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
def subtract(other: JavaSchemaRDD): JavaSchemaRDD =
this.baseSchemaRDD.subtract(other.baseSchemaRDD).toJavaSchemaRDD
/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
def subtract(other: JavaSchemaRDD, numPartitions: Int): JavaSchemaRDD =
this.baseSchemaRDD.subtract(other.baseSchemaRDD, numPartitions).toJavaSchemaRDD
/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
def subtract(other: JavaSchemaRDD, p: Partitioner): JavaSchemaRDD =
this.baseSchemaRDD.subtract(other.baseSchemaRDD, p).toJavaSchemaRDD
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment