Skip to content
Snippets Groups Projects
Commit 2a4f88b6 authored by Kousuke Saruta's avatar Kousuke Saruta Committed by Reynold Xin
Browse files

[SPARK-8914][SQL] Remove RDDApi

As rxin suggested in #7298 , we should consider to remove `RDDApi`.

Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>

Closes #7302 from sarutak/remove-rddapi and squashes the following commits:

e495d35 [Kousuke Saruta] Fixed mima
cb7ebb9 [Kousuke Saruta] Removed overriding RDDApi
parent f472b8cd
No related branches found
No related tags found
No related merge requests found
......@@ -70,7 +70,12 @@ object MimaExcludes {
"org.apache.spark.mllib.linalg.Matrix.numNonzeros"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.mllib.linalg.Matrix.numActives")
) ++ Seq(
// SPARK-8914 Remove RDDApi
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.RDDApi")
)
case v if v.startsWith("1.4") =>
Seq(
MimaBuild.excludeSparkPackage("deploy"),
......
......@@ -115,8 +115,7 @@ private[sql] object DataFrame {
@Experimental
class DataFrame private[sql](
@transient val sqlContext: SQLContext,
@DeveloperApi @transient val queryExecution: SQLContext#QueryExecution)
extends RDDApi[Row] with Serializable {
@DeveloperApi @transient val queryExecution: SQLContext#QueryExecution) extends Serializable {
/**
* A constructor that automatically analyzes the logical plan.
......@@ -1320,14 +1319,14 @@ class DataFrame private[sql](
* @group action
* @since 1.3.0
*/
override def first(): Row = head()
def first(): Row = head()
/**
* Returns a new RDD by applying a function to all rows of this DataFrame.
* @group rdd
* @since 1.3.0
*/
override def map[R: ClassTag](f: Row => R): RDD[R] = rdd.map(f)
def map[R: ClassTag](f: Row => R): RDD[R] = rdd.map(f)
/**
* Returns a new RDD by first applying a function to all rows of this [[DataFrame]],
......@@ -1335,14 +1334,14 @@ class DataFrame private[sql](
* @group rdd
* @since 1.3.0
*/
override def flatMap[R: ClassTag](f: Row => TraversableOnce[R]): RDD[R] = rdd.flatMap(f)
def flatMap[R: ClassTag](f: Row => TraversableOnce[R]): RDD[R] = rdd.flatMap(f)
/**
* Returns a new RDD by applying a function to each partition of this DataFrame.
* @group rdd
* @since 1.3.0
*/
override def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R] = {
def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R] = {
rdd.mapPartitions(f)
}
......@@ -1351,49 +1350,49 @@ class DataFrame private[sql](
* @group rdd
* @since 1.3.0
*/
override def foreach(f: Row => Unit): Unit = rdd.foreach(f)
def foreach(f: Row => Unit): Unit = rdd.foreach(f)
/**
* Applies a function f to each partition of this [[DataFrame]].
* @group rdd
* @since 1.3.0
*/
override def foreachPartition(f: Iterator[Row] => Unit): Unit = rdd.foreachPartition(f)
def foreachPartition(f: Iterator[Row] => Unit): Unit = rdd.foreachPartition(f)
/**
* Returns the first `n` rows in the [[DataFrame]].
* @group action
* @since 1.3.0
*/
override def take(n: Int): Array[Row] = head(n)
def take(n: Int): Array[Row] = head(n)
/**
* Returns an array that contains all of [[Row]]s in this [[DataFrame]].
* @group action
* @since 1.3.0
*/
override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect()
def collect(): Array[Row] = queryExecution.executedPlan.executeCollect()
/**
* Returns a Java list that contains all of [[Row]]s in this [[DataFrame]].
* @group action
* @since 1.3.0
*/
override def collectAsList(): java.util.List[Row] = java.util.Arrays.asList(rdd.collect() : _*)
def collectAsList(): java.util.List[Row] = java.util.Arrays.asList(rdd.collect() : _*)
/**
* Returns the number of rows in the [[DataFrame]].
* @group action
* @since 1.3.0
*/
override def count(): Long = groupBy().count().collect().head.getLong(0)
def count(): Long = groupBy().count().collect().head.getLong(0)
/**
* Returns a new [[DataFrame]] that has exactly `numPartitions` partitions.
* @group rdd
* @since 1.3.0
*/
override def repartition(numPartitions: Int): DataFrame = {
def repartition(numPartitions: Int): DataFrame = {
Repartition(numPartitions, shuffle = true, logicalPlan)
}
......@@ -1405,7 +1404,7 @@ class DataFrame private[sql](
* @group rdd
* @since 1.4.0
*/
override def coalesce(numPartitions: Int): DataFrame = {
def coalesce(numPartitions: Int): DataFrame = {
Repartition(numPartitions, shuffle = false, logicalPlan)
}
......@@ -1415,13 +1414,13 @@ class DataFrame private[sql](
* @group dfops
* @since 1.3.0
*/
override def distinct(): DataFrame = dropDuplicates()
def distinct(): DataFrame = dropDuplicates()
/**
* @group basic
* @since 1.3.0
*/
override def persist(): this.type = {
def persist(): this.type = {
sqlContext.cacheManager.cacheQuery(this)
this
}
......@@ -1430,13 +1429,13 @@ class DataFrame private[sql](
* @group basic
* @since 1.3.0
*/
override def cache(): this.type = persist()
def cache(): this.type = persist()
/**
* @group basic
* @since 1.3.0
*/
override def persist(newLevel: StorageLevel): this.type = {
def persist(newLevel: StorageLevel): this.type = {
sqlContext.cacheManager.cacheQuery(this, None, newLevel)
this
}
......@@ -1445,7 +1444,7 @@ class DataFrame private[sql](
* @group basic
* @since 1.3.0
*/
override def unpersist(blocking: Boolean): this.type = {
def unpersist(blocking: Boolean): this.type = {
sqlContext.cacheManager.tryUncacheQuery(this, blocking)
this
}
......@@ -1454,7 +1453,7 @@ class DataFrame private[sql](
* @group basic
* @since 1.3.0
*/
override def unpersist(): this.type = unpersist(blocking = false)
def unpersist(): this.type = unpersist(blocking = false)
/////////////////////////////////////////////////////////////////////////////
// I/O
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
/**
* An internal interface defining the RDD-like methods for [[DataFrame]].
* Please use [[DataFrame]] directly, and do NOT use this.
*/
private[sql] trait RDDApi[T] {
def cache(): this.type
def persist(): this.type
def persist(newLevel: StorageLevel): this.type
def unpersist(): this.type
def unpersist(blocking: Boolean): this.type
def map[R: ClassTag](f: T => R): RDD[R]
def flatMap[R: ClassTag](f: T => TraversableOnce[R]): RDD[R]
def mapPartitions[R: ClassTag](f: Iterator[T] => Iterator[R]): RDD[R]
def foreach(f: T => Unit): Unit
def foreachPartition(f: Iterator[T] => Unit): Unit
def take(n: Int): Array[T]
def collect(): Array[T]
def collectAsList(): java.util.List[T]
def count(): Long
def first(): T
def repartition(numPartitions: Int): DataFrame
def coalesce(numPartitions: Int): DataFrame
def distinct: DataFrame
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment