Skip to content
Snippets Groups Projects
Commit 54d19689 authored by Reynold Xin's avatar Reynold Xin Committed by Michael Armbrust
Browse files

[SPARK-5310][SQL] Fixes to Docs and Datasources API

 - Various Fixes to docs
 - Make data source traits actually interfaces

Based on #4862 but with fixed conflicts.

Author: Reynold Xin <rxin@databricks.com>
Author: Michael Armbrust <michael@databricks.com>

Closes #4868 from marmbrus/pr/4862 and squashes the following commits:

fe091ea [Michael Armbrust] Merge remote-tracking branch 'origin/master' into pr/4862
0208497 [Reynold Xin] Test fixes.
34e0a28 [Reynold Xin] [SPARK-5310][SQL] Various fixes to Spark SQL docs.
parent 12599942
No related branches found
No related tags found
No related merge requests found
Showing
with 106 additions and 129 deletions
......@@ -357,6 +357,21 @@ object Unidoc {
names.map(s => "org.apache.spark." + s).mkString(":")
}
private def ignoreUndocumentedPackages(packages: Seq[Seq[File]]): Seq[Seq[File]] = {
packages
.map(_.filterNot(_.getName.contains("$")))
.map(_.filterNot(_.getCanonicalPath.contains("akka")))
.map(_.filterNot(_.getCanonicalPath.contains("deploy")))
.map(_.filterNot(_.getCanonicalPath.contains("network")))
.map(_.filterNot(_.getCanonicalPath.contains("shuffle")))
.map(_.filterNot(_.getCanonicalPath.contains("executor")))
.map(_.filterNot(_.getCanonicalPath.contains("python")))
.map(_.filterNot(_.getCanonicalPath.contains("collection")))
.map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst")))
.map(_.filterNot(_.getCanonicalPath.contains("sql/execution")))
.map(_.filterNot(_.getCanonicalPath.contains("sql/hive/test")))
}
lazy val settings = scalaJavaUnidocSettings ++ Seq (
publish := {},
......@@ -368,22 +383,12 @@ object Unidoc {
// Skip actual catalyst, but include the subproject.
// Catalyst is not public API and contains quasiquotes which break scaladoc.
unidocAllSources in (ScalaUnidoc, unidoc) := {
(unidocAllSources in (ScalaUnidoc, unidoc)).value
.map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst")))
ignoreUndocumentedPackages((unidocAllSources in (ScalaUnidoc, unidoc)).value)
},
// Skip class names containing $ and some internal packages in Javadocs
unidocAllSources in (JavaUnidoc, unidoc) := {
(unidocAllSources in (JavaUnidoc, unidoc)).value
.map(_.filterNot(_.getName.contains("$")))
.map(_.filterNot(_.getCanonicalPath.contains("akka")))
.map(_.filterNot(_.getCanonicalPath.contains("deploy")))
.map(_.filterNot(_.getCanonicalPath.contains("network")))
.map(_.filterNot(_.getCanonicalPath.contains("shuffle")))
.map(_.filterNot(_.getCanonicalPath.contains("executor")))
.map(_.filterNot(_.getCanonicalPath.contains("python")))
.map(_.filterNot(_.getCanonicalPath.contains("collection")))
.map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst")))
ignoreUndocumentedPackages((unidocAllSources in (JavaUnidoc, unidoc)).value)
},
// Javadoc options: create a window title, and group key packages on index page
......
......@@ -64,7 +64,7 @@ private[sql] object DataFrame {
* val people = sqlContext.parquetFile("...")
*
* // Create a DataFrame from data sources
* val df =
* val df = sqlContext.load("...", "json")
* }}}
*
* Once created, it can be manipulated using the various domain-specific-language (DSL) functions
......@@ -80,9 +80,10 @@ private[sql] object DataFrame {
* {{{
* // The following creates a new column that increases everybody's age by 10.
* people("age") + 10 // in Scala
* people.col("age").plus(10); // in Java
* }}}
*
* A more concrete example:
* A more concrete example in Scala:
* {{{
* // To create DataFrame using SQLContext
* val people = sqlContext.parquetFile("...")
......@@ -94,6 +95,18 @@ private[sql] object DataFrame {
* .agg(avg(people("salary")), max(people("age")))
* }}}
*
* and in Java:
* {{{
* // To create DataFrame using SQLContext
* DataFrame people = sqlContext.parquetFile("...");
* DataFrame department = sqlContext.parquetFile("...");
*
* people.filter("age".gt(30))
* .join(department, people.col("deptId").equalTo(department("id")))
* .groupBy(department.col("name"), "gender")
* .agg(avg(people.col("salary")), max(people.col("age")));
* }}}
*
* @groupname basic Basic DataFrame functions
* @groupname dfops Language Integrated Queries
* @groupname rdd RDD Operations
......@@ -102,7 +115,7 @@ private[sql] object DataFrame {
*/
// TODO: Improve documentation.
@Experimental
class DataFrame protected[sql](
class DataFrame private[sql](
@transient val sqlContext: SQLContext,
@DeveloperApi @transient val queryExecution: SQLContext#QueryExecution)
extends RDDApi[Row] with Serializable {
......@@ -295,12 +308,14 @@ class DataFrame protected[sql](
* 1984 04 0.450090 0.483521
* }}}
* @param numRows Number of rows to show
* @group basic
*
* @group action
*/
def show(numRows: Int): Unit = println(showString(numRows))
/**
* Displays the top 20 rows of [[DataFrame]] in a tabular form.
* @group action
*/
def show(): Unit = show(20)
......@@ -738,16 +753,19 @@ class DataFrame protected[sql](
/**
* Returns the first `n` rows.
* @group action
*/
def head(n: Int): Array[Row] = limit(n).collect()
/**
* Returns the first row.
* @group action
*/
def head(): Row = head(1).head
/**
* Returns the first row. Alias for head().
* @group action
*/
override def first(): Row = head()
......@@ -831,6 +849,11 @@ class DataFrame protected[sql](
this
}
/**
* @group basic
*/
override def cache(): this.type = persist()
/**
* @group basic
*/
......@@ -847,6 +870,11 @@ class DataFrame protected[sql](
this
}
/**
* @group basic
*/
override def unpersist(): this.type = unpersist(blocking = false)
/////////////////////////////////////////////////////////////////////////////
// I/O
/////////////////////////////////////////////////////////////////////////////
......
......@@ -29,13 +29,13 @@ import org.apache.spark.storage.StorageLevel
*/
private[sql] trait RDDApi[T] {
def cache(): this.type = persist()
def cache(): this.type
def persist(): this.type
def persist(newLevel: StorageLevel): this.type
def unpersist(): this.type = unpersist(blocking = false)
def unpersist(): this.type
def unpersist(blocking: Boolean): this.type
......
......@@ -119,7 +119,8 @@ private[sql] case class JDBCRelation(
url: String,
table: String,
parts: Array[Partition])(@transient val sqlContext: SQLContext)
extends PrunedFilteredScan {
extends BaseRelation
with PrunedFilteredScan {
override val schema = JDBCRDD.resolveTable(url, table)
......
......@@ -90,7 +90,10 @@ private[sql] case class JSONRelation(
samplingRatio: Double,
userSpecifiedSchema: Option[StructType])(
@transient val sqlContext: SQLContext)
extends TableScan with InsertableRelation {
extends BaseRelation
with TableScan
with InsertableRelation {
// TODO: Support partitioned JSON relation.
private def baseRDD = sqlContext.sparkContext.textFile(path)
......
......@@ -159,7 +159,8 @@ private[sql] case class ParquetRelation2(
maybeSchema: Option[StructType] = None,
maybePartitionSpec: Option[PartitionSpec] = None)(
@transient val sqlContext: SQLContext)
extends CatalystScan
extends BaseRelation
with CatalystScan
with InsertableRelation
with SparkHadoopMapReduceUtil
with Logging {
......
......@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.sources
import org.apache.spark.annotation.{Experimental, DeveloperApi}
......@@ -90,12 +91,6 @@ trait CreatableRelationProvider {
* existing data is expected to be overwritten by the contents of the DataFrame.
* ErrorIfExists mode means that when saving a DataFrame to a data source,
* if data already exists, an exception is expected to be thrown.
*
* @param sqlContext
* @param mode
* @param parameters
* @param data
* @return
*/
def createRelation(
sqlContext: SQLContext,
......@@ -138,7 +133,7 @@ abstract class BaseRelation {
* A BaseRelation that can produce all of its tuples as an RDD of Row objects.
*/
@DeveloperApi
trait TableScan extends BaseRelation {
trait TableScan {
def buildScan(): RDD[Row]
}
......@@ -148,7 +143,7 @@ trait TableScan extends BaseRelation {
* containing all of its tuples as Row objects.
*/
@DeveloperApi
trait PrunedScan extends BaseRelation {
trait PrunedScan {
def buildScan(requiredColumns: Array[String]): RDD[Row]
}
......@@ -162,24 +157,10 @@ trait PrunedScan extends BaseRelation {
* as filtering partitions based on a bloom filter.
*/
@DeveloperApi
trait PrunedFilteredScan extends BaseRelation {
trait PrunedFilteredScan {
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}
/**
* ::Experimental::
* An interface for experimenting with a more direct connection to the query planner. Compared to
* [[PrunedFilteredScan]], this operator receives the raw expressions from the
* [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. Unlike the other APIs this
* interface is not designed to be binary compatible across releases and thus should only be used
* for experimentation.
*/
@Experimental
trait CatalystScan extends BaseRelation {
def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row]
}
@DeveloperApi
/**
* ::DeveloperApi::
* A BaseRelation that can be used to insert data into it through the insert method.
......@@ -196,6 +177,20 @@ trait CatalystScan extends BaseRelation {
* If a data source needs to check the actual nullability of a field, it needs to do it in the
* insert method.
*/
trait InsertableRelation extends BaseRelation {
@DeveloperApi
trait InsertableRelation {
def insert(data: DataFrame, overwrite: Boolean): Unit
}
/**
* ::Experimental::
* An interface for experimenting with a more direct connection to the query planner. Compared to
* [[PrunedFilteredScan]], this operator receives the raw expressions from the
* [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. Unlike the other APIs this
* interface is NOT designed to be binary compatible across releases and thus should only be used
* for experimentation.
*/
@Experimental
trait CatalystScan {
def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row]
}
......@@ -29,7 +29,7 @@ class DDLScanSource extends RelationProvider {
}
case class SimpleDDLScan(from: Int, to: Int)(@transient val sqlContext: SQLContext)
extends TableScan {
extends BaseRelation with TableScan {
override def schema =
StructType(Seq(
......
......@@ -32,7 +32,8 @@ class FilteredScanSource extends RelationProvider {
}
case class SimpleFilteredScan(from: Int, to: Int)(@transient val sqlContext: SQLContext)
extends PrunedFilteredScan {
extends BaseRelation
with PrunedFilteredScan {
override def schema =
StructType(
......
......@@ -31,7 +31,8 @@ class PrunedScanSource extends RelationProvider {
}
case class SimplePrunedScan(from: Int, to: Int)(@transient val sqlContext: SQLContext)
extends PrunedScan {
extends BaseRelation
with PrunedScan {
override def schema =
StructType(
......
......@@ -33,7 +33,7 @@ class SimpleScanSource extends RelationProvider {
}
case class SimpleScan(from: Int, to: Int)(@transient val sqlContext: SQLContext)
extends TableScan {
extends BaseRelation with TableScan {
override def schema =
StructType(StructField("i", IntegerType, nullable = false) :: Nil)
......@@ -51,10 +51,11 @@ class AllDataTypesScanSource extends SchemaRelationProvider {
}
case class AllDataTypesScan(
from: Int,
to: Int,
userSpecifiedSchema: StructType)(@transient val sqlContext: SQLContext)
extends TableScan {
from: Int,
to: Int,
userSpecifiedSchema: StructType)(@transient val sqlContext: SQLContext)
extends BaseRelation
with TableScan {
override def schema = userSpecifiedSchema
......
......@@ -777,7 +777,8 @@ private[hive] case class MetastoreRelation
val columnOrdinals = AttributeMap(attributes.zipWithIndex)
}
object HiveMetastoreTypes {
private[hive] object HiveMetastoreTypes {
protected val ddlParser = new DDLParser(HiveQl.parseSql(_))
def toDataType(metastoreType: String): DataType = synchronized {
......
......@@ -28,7 +28,6 @@ import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.MetastoreRelation
/**
* :: Experimental ::
* Create table and insert the query result into it.
* @param database the database name of the new relation
* @param tableName the table name of the new relation
......@@ -38,7 +37,7 @@ import org.apache.spark.sql.hive.MetastoreRelation
* @param desc the CreateTableDesc, which may contains serde, storage handler etc.
*/
@Experimental
private[hive]
case class CreateTableAsSelect(
database: String,
tableName: String,
......
......@@ -29,11 +29,9 @@ import org.apache.spark.sql.hive.HiveShim
import org.apache.spark.sql.SQLContext
/**
* :: DeveloperApi ::
*
* Implementation for "describe [extended] table".
*/
@DeveloperApi
private[hive]
case class DescribeHiveTableCommand(
table: MetastoreRelation,
override val output: Seq[Attribute],
......
......@@ -17,17 +17,13 @@
package org.apache.spark.sql.hive.execution
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row}
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.StringType
/**
* :: DeveloperApi ::
*/
@DeveloperApi
private[hive]
case class HiveNativeCommand(sql: String) extends RunnableCommand {
override def output =
......
......@@ -26,21 +26,19 @@ import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive._
import org.apache.spark.sql.types.{BooleanType, DataType}
/**
* :: DeveloperApi ::
* The Hive table scan operator. Column and partition pruning are both handled.
*
* @param requestedAttributes Attributes to be fetched from the Hive table.
* @param relation The Hive table be be scanned.
* @param partitionPruningPred An optional partition pruning predicate for partitioned table.
*/
@DeveloperApi
private[hive]
case class HiveTableScan(
requestedAttributes: Seq[Attribute],
relation: MetastoreRelation,
......
......@@ -32,7 +32,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.Object
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
......@@ -41,10 +40,7 @@ import org.apache.spark.sql.hive.{ ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive.HiveShim._
import org.apache.spark.{SerializableWritable, SparkException, TaskContext}
/**
* :: DeveloperApi ::
*/
@DeveloperApi
private[hive]
case class InsertIntoHiveTable(
table: MetastoreRelation,
partition: Map[String, Option[String]],
......
......@@ -21,15 +21,12 @@ import java.io.{BufferedReader, InputStreamReader}
import java.io.{DataInputStream, DataOutputStream, EOFException}
import java.util.Properties
import scala.collection.JavaConversions._
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.AbstractSerDe
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.ScriptInputOutputSchema
import org.apache.spark.sql.execution._
......@@ -38,19 +35,14 @@ import org.apache.spark.sql.hive.{HiveContext, HiveInspectors}
import org.apache.spark.sql.hive.HiveShim._
import org.apache.spark.util.Utils
/* Implicit conversions */
import scala.collection.JavaConversions._
/**
* :: DeveloperApi ::
* Transforms the input by forking and running the specified script.
*
* @param input the set of expression that should be passed to the script.
* @param script the command that should be executed.
* @param output the attributes that are produced by the script.
*/
@DeveloperApi
private[hive]
case class ScriptTransformation(
input: Seq[Expression],
script: String,
......@@ -175,6 +167,7 @@ case class ScriptTransformation(
/**
* The wrapper class of Hive input and output schema properties
*/
private[hive]
case class HiveScriptIOSchema (
inputRowFormat: Seq[(String, String)],
outputRowFormat: Seq[(String, String)],
......
......@@ -17,7 +17,6 @@
package org.apache.spark.sql.hive.execution
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.util._
......@@ -30,14 +29,13 @@ import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.StructType
/**
* :: DeveloperApi ::
* Analyzes the given table in the current database to generate statistics, which will be
* used in query optimizations.
*
* Right now, it only supports Hive tables and it only updates the size of a Hive table
* in the Hive metastore.
*/
@DeveloperApi
private[hive]
case class AnalyzeTable(tableName: String) extends RunnableCommand {
override def run(sqlContext: SQLContext) = {
......@@ -47,10 +45,9 @@ case class AnalyzeTable(tableName: String) extends RunnableCommand {
}
/**
* :: DeveloperApi ::
* Drops a table from the metastore and removes it if it is cached.
*/
@DeveloperApi
private[hive]
case class DropTable(
tableName: String,
ifExists: Boolean) extends RunnableCommand {
......@@ -75,10 +72,7 @@ case class DropTable(
}
}
/**
* :: DeveloperApi ::
*/
@DeveloperApi
private[hive]
case class AddJar(path: String) extends RunnableCommand {
override def run(sqlContext: SQLContext) = {
......@@ -89,10 +83,7 @@ case class AddJar(path: String) extends RunnableCommand {
}
}
/**
* :: DeveloperApi ::
*/
@DeveloperApi
private[hive]
case class AddFile(path: String) extends RunnableCommand {
override def run(sqlContext: SQLContext) = {
......@@ -103,10 +94,7 @@ case class AddFile(path: String) extends RunnableCommand {
}
}
/**
* :: DeveloperApi ::
*/
@DeveloperApi
private[hive]
case class CreateMetastoreDataSource(
tableName: String,
userSpecifiedSchema: Option[StructType],
......@@ -146,10 +134,7 @@ case class CreateMetastoreDataSource(
}
}
/**
* :: DeveloperApi ::
*/
@DeveloperApi
private[hive]
case class CreateMetastoreDataSourceAsSelect(
tableName: String,
provider: String,
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive
/**
* Physical execution operators used for running queries against data stored in Hive. These
* are not intended for use by users, but are documents so that it is easier to understand
* the output of EXPLAIN queries.
*/
package object execution
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment