diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 8fb1239b4a96bff1c10e77160a39868f6727a74c..e4b1b96527fbdeee7d5c8ed8076524f77d1a489d 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -361,9 +361,16 @@ object Unidoc { publish := {}, unidocProjectFilter in(ScalaUnidoc, unidoc) := - inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, catalyst, streamingFlumeSink, yarn), + inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn), unidocProjectFilter in(JavaUnidoc, unidoc) := - inAnyProject -- inProjects(OldDeps.project, repl, bagel, examples, tools, catalyst, streamingFlumeSink, yarn), + inAnyProject -- inProjects(OldDeps.project, repl, bagel, examples, tools, streamingFlumeSink, yarn), + + // Skip actual catalyst, but include the subproject. + // Catalyst is not public API and contains quasiquotes which break scaladoc. + unidocAllSources in (ScalaUnidoc, unidoc) := { + (unidocAllSources in (ScalaUnidoc, unidoc)).value + .map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst"))) + }, // Skip class names containing $ and some internal packages in Javadocs unidocAllSources in (JavaUnidoc, unidoc) := { @@ -376,6 +383,7 @@ object Unidoc { .map(_.filterNot(_.getCanonicalPath.contains("executor"))) .map(_.filterNot(_.getCanonicalPath.contains("python"))) .map(_.filterNot(_.getCanonicalPath.contains("collection"))) + .map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst"))) }, // Javadoc options: create a window title, and group key packages on index page diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index dd2cd5ee76f60c9b098ecaec8ffc518ef086fdea..2e2309f10375d348c7d4d84c6ac2cce781f4d5a8 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -252,7 +252,7 @@ class SQLContext(object): >>> schema = StructType([StructField("field1", IntegerType(), False), ... StructField("field2", StringType(), False)]) >>> df = sqlCtx.applySchema(rdd2, schema) - >>> sqlCtx.registerRDDAsTable(df, "table1") + >>> sqlCtx.registerDataFrameAsTable(df, "table1") >>> df2 = sqlCtx.sql("SELECT * from table1") >>> df2.collect() [Row(field1=1, field2=u'row1'),..., Row(field1=3, field2=u'row3')] @@ -405,17 +405,17 @@ class SQLContext(object): return self.applySchema(data, schema) - def registerRDDAsTable(self, rdd, tableName): + def registerDataFrameAsTable(self, rdd, tableName): """Registers the given RDD as a temporary table in the catalog. Temporary tables exist only during the lifetime of this instance of SQLContext. - >>> sqlCtx.registerRDDAsTable(df, "table1") + >>> sqlCtx.registerDataFrameAsTable(df, "table1") """ if (rdd.__class__ is DataFrame): df = rdd._jdf - self._ssql_ctx.registerRDDAsTable(df, tableName) + self._ssql_ctx.registerDataFrameAsTable(df, tableName) else: raise ValueError("Can only register DataFrame as table") @@ -456,7 +456,7 @@ class SQLContext(object): ... print>>ofn, json >>> ofn.close() >>> df1 = sqlCtx.jsonFile(jsonFile) - >>> sqlCtx.registerRDDAsTable(df1, "table1") + >>> sqlCtx.registerDataFrameAsTable(df1, "table1") >>> df2 = sqlCtx.sql( ... "SELECT field1 AS f1, field2 as f2, field3 as f3, " ... "field6 as f4 from table1") @@ -467,7 +467,7 @@ class SQLContext(object): Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None) >>> df3 = sqlCtx.jsonFile(jsonFile, df1.schema) - >>> sqlCtx.registerRDDAsTable(df3, "table2") + >>> sqlCtx.registerDataFrameAsTable(df3, "table2") >>> df4 = sqlCtx.sql( ... "SELECT field1 AS f1, field2 as f2, field3 as f3, " ... "field6 as f4 from table2") @@ -485,7 +485,7 @@ class SQLContext(object): ... StructField("field5", ... ArrayType(IntegerType(), False), True)]), False)]) >>> df5 = sqlCtx.jsonFile(jsonFile, schema) - >>> sqlCtx.registerRDDAsTable(df5, "table3") + >>> sqlCtx.registerDataFrameAsTable(df5, "table3") >>> df6 = sqlCtx.sql( ... "SELECT field2 AS f1, field3.field5 as f2, " ... "field3.field5[0] as f3 from table3") @@ -509,7 +509,7 @@ class SQLContext(object): determine the schema. >>> df1 = sqlCtx.jsonRDD(json) - >>> sqlCtx.registerRDDAsTable(df1, "table1") + >>> sqlCtx.registerDataFrameAsTable(df1, "table1") >>> df2 = sqlCtx.sql( ... "SELECT field1 AS f1, field2 as f2, field3 as f3, " ... "field6 as f4 from table1") @@ -520,7 +520,7 @@ class SQLContext(object): Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None) >>> df3 = sqlCtx.jsonRDD(json, df1.schema) - >>> sqlCtx.registerRDDAsTable(df3, "table2") + >>> sqlCtx.registerDataFrameAsTable(df3, "table2") >>> df4 = sqlCtx.sql( ... "SELECT field1 AS f1, field2 as f2, field3 as f3, " ... "field6 as f4 from table2") @@ -538,7 +538,7 @@ class SQLContext(object): ... StructField("field5", ... ArrayType(IntegerType(), False), True)]), False)]) >>> df5 = sqlCtx.jsonRDD(json, schema) - >>> sqlCtx.registerRDDAsTable(df5, "table3") + >>> sqlCtx.registerDataFrameAsTable(df5, "table3") >>> df6 = sqlCtx.sql( ... "SELECT field2 AS f1, field3.field5 as f2, " ... "field3.field5[0] as f3 from table3") @@ -628,7 +628,7 @@ class SQLContext(object): def sql(self, sqlQuery): """Return a L{DataFrame} representing the result of the given query. - >>> sqlCtx.registerRDDAsTable(df, "table1") + >>> sqlCtx.registerDataFrameAsTable(df, "table1") >>> df2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1") >>> df2.collect() [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')] @@ -638,7 +638,7 @@ class SQLContext(object): def table(self, tableName): """Returns the specified table as a L{DataFrame}. - >>> sqlCtx.registerRDDAsTable(df, "table1") + >>> sqlCtx.registerDataFrameAsTable(df, "table1") >>> df2 = sqlCtx.table("table1") >>> sorted(df.collect()) == sorted(df2.collect()) True @@ -653,7 +653,7 @@ class SQLContext(object): The returned DataFrame has two columns, tableName and isTemporary (a column with BooleanType indicating if a table is a temporary one or not). - >>> sqlCtx.registerRDDAsTable(df, "table1") + >>> sqlCtx.registerDataFrameAsTable(df, "table1") >>> df2 = sqlCtx.tables() >>> df2.filter("tableName = 'table1'").first() Row(tableName=u'table1', isTemporary=True) @@ -668,7 +668,7 @@ class SQLContext(object): If `dbName` is not specified, the current database will be used. - >>> sqlCtx.registerRDDAsTable(df, "table1") + >>> sqlCtx.registerDataFrameAsTable(df, "table1") >>> "table1" in sqlCtx.tableNames() True >>> "table1" in sqlCtx.tableNames("db") diff --git a/sql/core/src/main/java/org/apache/spark/sql/jdbc/JDBCUtils.java b/sql/core/src/main/java/org/apache/spark/sql/jdbc/JDBCUtils.java deleted file mode 100644 index aa441b2096f18b56eae57da5351d1a048179019b..0000000000000000000000000000000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/jdbc/JDBCUtils.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.jdbc; - -import org.apache.spark.Partition; -import org.apache.spark.sql.SQLContext; -import org.apache.spark.sql.DataFrame; - -public class JDBCUtils { - /** - * Construct a DataFrame representing the JDBC table at the database - * specified by url with table name table. - */ - public static DataFrame jdbcRDD(SQLContext sql, String url, String table) { - Partition[] parts = new Partition[1]; - parts[0] = new JDBCPartition(null, 0); - return sql.baseRelationToDataFrame( - new JDBCRelation(url, table, parts, sql)); - } - - /** - * Construct a DataFrame representing the JDBC table at the database - * specified by url with table name table partitioned by parts. - * Here, parts is an array of expressions suitable for insertion into a WHERE - * clause; each one defines one partition. - */ - public static DataFrame jdbcRDD(SQLContext sql, String url, String table, String[] parts) { - Partition[] partitions = new Partition[parts.length]; - for (int i = 0; i < parts.length; i++) - partitions[i] = new JDBCPartition(parts[i], i); - return sql.baseRelationToDataFrame( - new JDBCRelation(url, table, partitions, sql)); - } - - private static JavaJDBCTrampoline trampoline = new JavaJDBCTrampoline(); - - public static void createJDBCTable(DataFrame rdd, String url, String table, boolean allowExisting) { - trampoline.createJDBCTable(rdd, url, table, allowExisting); - } - - public static void insertIntoJDBC(DataFrame rdd, String url, String table, boolean overwrite) { - trampoline.insertIntoJDBC(rdd, url, table, overwrite); - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index c0c3cb40cf1da11427e523c10bd1da748322d243..fa5fe84263ece7fea3426e25139761f468f8ac6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql +import java.sql.DriverManager + + import scala.collection.JavaConversions._ import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag @@ -27,6 +30,7 @@ import org.apache.spark.api.java.JavaRDD import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.jdbc.JDBCWriteDetails import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils @@ -77,6 +81,12 @@ private[sql] object DataFrame { * .groupBy(department("name"), "gender") * .agg(avg(people("salary")), max(people("age"))) * }}} + * + * @groupname basic Basic DataFrame functions + * @groupname dfops Language Integrated Queries + * @groupname rdd RDD Operations + * @groupname output Output Operations + * @groupname action Actions */ // TODO: Improve documentation. @Experimental @@ -102,7 +112,8 @@ trait DataFrame extends RDDApi[Row] with Serializable { def toSchemaRDD: DataFrame = this /** - * Returns the object itself. Used to force an implicit conversion from RDD to DataFrame in Scala. + * Returns the object itself. + * @group basic */ // This is declared with parentheses to prevent the Scala compiler from treating // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame. @@ -116,31 +127,51 @@ trait DataFrame extends RDDApi[Row] with Serializable { * rdd.toDF // this implicit conversion creates a DataFrame with column name _1 and _2 * rdd.toDF("id", "name") // this creates a DataFrame with column name "id" and "name" * }}} + * @group basic */ @scala.annotation.varargs def toDF(colNames: String*): DataFrame - /** Returns the schema of this [[DataFrame]]. */ + /** + * Returns the schema of this [[DataFrame]]. + * @group basic + */ def schema: StructType - /** Returns all column names and their data types as an array. */ + /** + * Returns all column names and their data types as an array. + * @group basic + */ def dtypes: Array[(String, String)] - /** Returns all column names as an array. */ + /** + * Returns all column names as an array. + * @group basic + */ def columns: Array[String] = schema.fields.map(_.name) - /** Prints the schema to the console in a nice tree format. */ + /** + * Prints the schema to the console in a nice tree format. + * @group basic + */ def printSchema(): Unit - /** Prints the plans (logical and physical) to the console for debugging purpose. */ + /** + * Prints the plans (logical and physical) to the console for debugging purpose. + * @group basic + */ def explain(extended: Boolean): Unit - /** Only prints the physical plan to the console for debugging purpose. */ + /** + * Only prints the physical plan to the console for debugging purpose. + * @group basic + */ def explain(): Unit = explain(extended = false) /** * Returns true if the `collect` and `take` methods can be run locally * (without any Spark executors). + * @group basic */ def isLocal: Boolean @@ -154,6 +185,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * 1983 03 0.410516 0.442194 * 1984 04 0.450090 0.483521 * }}} + * @group basic */ def show(): Unit @@ -163,6 +195,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * Note that cartesian joins are very expensive without an extra filter that can be pushed down. * * @param right Right side of the join operation. + * @group dfops */ def join(right: DataFrame): DataFrame @@ -174,6 +207,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * df1.join(df2, $"df1Key" === $"df2Key") * df1.join(df2).where($"df1Key" === $"df2Key") * }}} + * @group dfops */ def join(right: DataFrame, joinExprs: Column): DataFrame @@ -194,6 +228,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * @param right Right side of the join. * @param joinExprs Join expression. * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `semijoin`. + * @group dfops */ def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame @@ -205,6 +240,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * df.sort($"sortcol") * df.sort($"sortcol".asc) * }}} + * @group dfops */ @scala.annotation.varargs def sort(sortCol: String, sortCols: String*): DataFrame @@ -214,6 +250,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * {{{ * df.sort($"col1", $"col2".desc) * }}} + * @group dfops */ @scala.annotation.varargs def sort(sortExprs: Column*): DataFrame @@ -221,6 +258,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { /** * Returns a new [[DataFrame]] sorted by the given expressions. * This is an alias of the `sort` function. + * @group dfops */ @scala.annotation.varargs def orderBy(sortCol: String, sortCols: String*): DataFrame @@ -228,27 +266,32 @@ trait DataFrame extends RDDApi[Row] with Serializable { /** * Returns a new [[DataFrame]] sorted by the given expressions. * This is an alias of the `sort` function. + * @group dfops */ @scala.annotation.varargs def orderBy(sortExprs: Column*): DataFrame /** * Selects column based on the column name and return it as a [[Column]]. + * @group dfops */ def apply(colName: String): Column = col(colName) /** * Selects column based on the column name and return it as a [[Column]]. + * @group dfops */ def col(colName: String): Column /** * Returns a new [[DataFrame]] with an alias set. + * @group dfops */ def as(alias: String): DataFrame /** * (Scala-specific) Returns a new [[DataFrame]] with an alias set. + * @group dfops */ def as(alias: Symbol): DataFrame @@ -257,6 +300,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * {{{ * df.select($"colA", $"colB" + 1) * }}} + * @group dfops */ @scala.annotation.varargs def select(cols: Column*): DataFrame @@ -270,6 +314,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * df.select("colA", "colB") * df.select($"colA", $"colB") * }}} + * @group dfops */ @scala.annotation.varargs def select(col: String, cols: String*): DataFrame @@ -281,6 +326,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * {{{ * df.selectExpr("colA", "colB as newName", "abs(colC)") * }}} + * @group dfops */ @scala.annotation.varargs def selectExpr(exprs: String*): DataFrame @@ -293,6 +339,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * peopleDf.where($"age" > 15) * peopleDf($"age" > 15) * }}} + * @group dfops */ def filter(condition: Column): DataFrame @@ -301,6 +348,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * {{{ * peopleDf.filter("age > 15") * }}} + * @group dfops */ def filter(conditionExpr: String): DataFrame @@ -312,6 +360,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * peopleDf.where($"age" > 15) * peopleDf($"age" > 15) * }}} + * @group dfops */ def where(condition: Column): DataFrame @@ -329,6 +378,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * "age" -> "max" * )) * }}} + * @group dfops */ @scala.annotation.varargs def groupBy(cols: Column*): GroupedData @@ -350,6 +400,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * "age" -> "max" * )) * }}} + * @group dfops */ @scala.annotation.varargs def groupBy(col1: String, cols: String*): GroupedData @@ -366,6 +417,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * "expense" -> "sum" * ) * }}} + * @group dfops */ def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = { groupBy().agg(aggExpr, aggExprs :_*) @@ -378,6 +430,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * df.agg(Map("age" -> "max", "salary" -> "avg")) * df.groupBy().agg(Map("age" -> "max", "salary" -> "avg")) * }} + * @group dfops */ def agg(exprs: Map[String, String]): DataFrame = groupBy().agg(exprs) @@ -388,6 +441,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * df.agg(Map("age" -> "max", "salary" -> "avg")) * df.groupBy().agg(Map("age" -> "max", "salary" -> "avg")) * }} + * @group dfops */ def agg(exprs: java.util.Map[String, String]): DataFrame = groupBy().agg(exprs) @@ -398,6 +452,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * df.agg(max($"age"), avg($"salary")) * df.groupBy().agg(max($"age"), avg($"salary")) * }} + * @group dfops */ @scala.annotation.varargs def agg(expr: Column, exprs: Column*): DataFrame = groupBy().agg(expr, exprs :_*) @@ -405,24 +460,28 @@ trait DataFrame extends RDDApi[Row] with Serializable { /** * Returns a new [[DataFrame]] by taking the first `n` rows. The difference between this function * and `head` is that `head` returns an array while `limit` returns a new [[DataFrame]]. + * @group dfops */ def limit(n: Int): DataFrame /** * Returns a new [[DataFrame]] containing union of rows in this frame and another frame. * This is equivalent to `UNION ALL` in SQL. + * @group dfops */ def unionAll(other: DataFrame): DataFrame /** * Returns a new [[DataFrame]] containing rows only in both this frame and another frame. * This is equivalent to `INTERSECT` in SQL. + * @group dfops */ def intersect(other: DataFrame): DataFrame /** * Returns a new [[DataFrame]] containing rows in this frame but not in another frame. * This is equivalent to `EXCEPT` in SQL. + * @group dfops */ def except(other: DataFrame): DataFrame @@ -432,6 +491,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * @param withReplacement Sample with replacement or not. * @param fraction Fraction of rows to generate. * @param seed Seed for sampling. + * @group dfops */ def sample(withReplacement: Boolean, fraction: Double, seed: Long): DataFrame @@ -440,6 +500,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * * @param withReplacement Sample with replacement or not. * @param fraction Fraction of rows to generate. + * @group dfops */ def sample(withReplacement: Boolean, fraction: Double): DataFrame = { sample(withReplacement, fraction, Utils.random.nextLong) @@ -464,6 +525,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * * val bookCountPerWord = allWords.groupBy("word").agg(countDistinct("title")) * }}} + * @group dfops */ def explode[A <: Product : TypeTag](input: Column*)(f: Row => TraversableOnce[A]): DataFrame @@ -476,6 +538,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * {{{ * df.explode("words", "word")(words: String => words.split(" ")) * }}} + * @group dfops */ def explode[A, B : TypeTag]( inputColumn: String, @@ -486,11 +549,13 @@ trait DataFrame extends RDDApi[Row] with Serializable { /** * Returns a new [[DataFrame]] by adding a column. + * @group dfops */ def withColumn(colName: String, col: Column): DataFrame /** * Returns a new [[DataFrame]] with a column renamed. + * @group dfops */ def withColumnRenamed(existingName: String, newName: String): DataFrame @@ -511,62 +576,84 @@ trait DataFrame extends RDDApi[Row] with Serializable { /** * Returns a new RDD by applying a function to all rows of this DataFrame. + * @group rdd */ override def map[R: ClassTag](f: Row => R): RDD[R] /** * Returns a new RDD by first applying a function to all rows of this [[DataFrame]], * and then flattening the results. + * @group rdd */ override def flatMap[R: ClassTag](f: Row => TraversableOnce[R]): RDD[R] /** * Returns a new RDD by applying a function to each partition of this DataFrame. + * @group rdd */ override def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R] /** * Applies a function `f` to all rows. + * @group rdd */ override def foreach(f: Row => Unit): Unit /** * Applies a function f to each partition of this [[DataFrame]]. + * @group rdd */ override def foreachPartition(f: Iterator[Row] => Unit): Unit /** * Returns the first `n` rows in the [[DataFrame]]. + * @group action */ override def take(n: Int): Array[Row] /** * Returns an array that contains all of [[Row]]s in this [[DataFrame]]. + * @group action */ override def collect(): Array[Row] /** * Returns a Java list that contains all of [[Row]]s in this [[DataFrame]]. + * @group action */ override def collectAsList(): java.util.List[Row] /** * Returns the number of rows in the [[DataFrame]]. + * @group action */ override def count(): Long /** * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions. + * @group rdd */ override def repartition(numPartitions: Int): DataFrame - /** Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]]. */ + /** + * Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]]. + * @group dfops + */ override def distinct: DataFrame + /** + * @group basic + */ override def persist(): this.type + /** + * @group basic + */ override def persist(newLevel: StorageLevel): this.type + /** + * @group basic + */ override def unpersist(blocking: Boolean): this.type ///////////////////////////////////////////////////////////////////////////// @@ -575,16 +662,19 @@ trait DataFrame extends RDDApi[Row] with Serializable { /** * Returns the content of the [[DataFrame]] as an [[RDD]] of [[Row]]s. + * @group rdd */ def rdd: RDD[Row] /** * Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s. + * @group rdd */ def toJavaRDD: JavaRDD[Row] = rdd.toJavaRDD() /** * Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s. + * @group rdd */ def javaRDD: JavaRDD[Row] = toJavaRDD @@ -592,7 +682,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * Registers this RDD as a temporary table using the given name. The lifetime of this temporary * table is tied to the [[SQLContext]] that was used to create this DataFrame. * - * @group schema + * @group basic */ def registerTempTable(tableName: String): Unit @@ -600,6 +690,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * Saves the contents of this [[DataFrame]] as a parquet file, preserving the schema. * Files that are written out using this method can be read back in as a [[DataFrame]] * using the `parquetFile` function in [[SQLContext]]. + * @group output */ def saveAsParquetFile(path: String): Unit @@ -613,6 +704,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * there is no notion of a persisted catalog in a standard SQL context. Instead you can write * an RDD out to a parquet file, and then register that file as a table. This "table" can then * be the target of an `insertInto`. + * @group output */ @Experimental def saveAsTable(tableName: String): Unit = { @@ -628,6 +720,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * there is no notion of a persisted catalog in a standard SQL context. Instead you can write * an RDD out to a parquet file, and then register that file as a table. This "table" can then * be the target of an `insertInto`. + * @group output */ @Experimental def saveAsTable(tableName: String, mode: SaveMode): Unit = { @@ -651,6 +744,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * there is no notion of a persisted catalog in a standard SQL context. Instead you can write * an RDD out to a parquet file, and then register that file as a table. This "table" can then * be the target of an `insertInto`. + * @group output */ @Experimental def saveAsTable( @@ -668,6 +762,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * there is no notion of a persisted catalog in a standard SQL context. Instead you can write * an RDD out to a parquet file, and then register that file as a table. This "table" can then * be the target of an `insertInto`. + * @group output */ @Experimental def saveAsTable( @@ -686,6 +781,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * there is no notion of a persisted catalog in a standard SQL context. Instead you can write * an RDD out to a parquet file, and then register that file as a table. This "table" can then * be the target of an `insertInto`. + * @group output */ @Experimental def saveAsTable( @@ -706,6 +802,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * there is no notion of a persisted catalog in a standard SQL context. Instead you can write * an RDD out to a parquet file, and then register that file as a table. This "table" can then * be the target of an `insertInto`. + * @group output */ @Experimental def saveAsTable( @@ -719,6 +816,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * Saves the contents of this DataFrame to the given path, * using the default data source configured by spark.sql.sources.default and * [[SaveMode.ErrorIfExists]] as the save mode. + * @group output */ @Experimental def save(path: String): Unit = { @@ -729,6 +827,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * :: Experimental :: * Saves the contents of this DataFrame to the given path and [[SaveMode]] specified by mode, * using the default data source configured by spark.sql.sources.default. + * @group output */ @Experimental def save(path: String, mode: SaveMode): Unit = { @@ -740,6 +839,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * :: Experimental :: * Saves the contents of this DataFrame to the given path based on the given data source, * using [[SaveMode.ErrorIfExists]] as the save mode. + * @group output */ @Experimental def save(path: String, source: String): Unit = { @@ -750,6 +850,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * :: Experimental :: * Saves the contents of this DataFrame to the given path based on the given data source and * [[SaveMode]] specified by mode. + * @group output */ @Experimental def save(path: String, source: String, mode: SaveMode): Unit = { @@ -760,6 +861,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * :: Experimental :: * Saves the contents of this DataFrame based on the given data source, * [[SaveMode]] specified by mode, and a set of options. + * @group output */ @Experimental def save( @@ -774,6 +876,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * (Scala-specific) * Saves the contents of this DataFrame based on the given data source, * [[SaveMode]] specified by mode, and a set of options + * @group output */ @Experimental def save( @@ -784,6 +887,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { /** * :: Experimental :: * Adds the rows from this RDD to the specified table, optionally overwriting the existing data. + * @group output */ @Experimental def insertInto(tableName: String, overwrite: Boolean): Unit @@ -792,15 +896,46 @@ trait DataFrame extends RDDApi[Row] with Serializable { * :: Experimental :: * Adds the rows from this RDD to the specified table. * Throws an exception if the table already exists. + * @group output */ @Experimental def insertInto(tableName: String): Unit = insertInto(tableName, overwrite = false) /** * Returns the content of the [[DataFrame]] as a RDD of JSON strings. + * @group rdd */ def toJSON: RDD[String] + //////////////////////////////////////////////////////////////////////////// + // JDBC Write Support + //////////////////////////////////////////////////////////////////////////// + + /** + * Save this RDD to a JDBC database at `url` under the table name `table`. + * This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements. + * If you pass `true` for `allowExisting`, it will drop any table with the + * given name; if you pass `false`, it will throw if the table already + * exists. + * @group output + */ + def createJDBCTable(url: String, table: String, allowExisting: Boolean): Unit + + /** + * Save this RDD to a JDBC database at `url` under the table name `table`. + * Assumes the table already exists and has a compatible schema. If you + * pass `true` for `overwrite`, it will `TRUNCATE` the table before + * performing the `INSERT`s. + * + * The table must already exist on the database. It must have a schema + * that is compatible with the schema of this RDD; inserting the rows of + * the RDD in order via the simple statement + * `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail. + * @group output + */ + def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit + + //////////////////////////////////////////////////////////////////////////// // for Python API //////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala index 848ea2e05624f9ecd06ea9ccc73b1fc155197cec..25bc9d929237d80ce47d9356f46e06ac8df7cd4c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import java.io.CharArrayWriter +import java.sql.DriverManager import scala.language.implicitConversions import scala.reflect.ClassTag @@ -36,6 +37,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.{JoinType, Inner} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{ExplainCommand, LogicalRDD, EvaluatePython} +import org.apache.spark.sql.jdbc.JDBCWriteDetails import org.apache.spark.sql.json.JsonRDD import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{NumericType, StructType} @@ -375,7 +377,7 @@ private[sql] class DataFrameImpl protected[sql]( } override def registerTempTable(tableName: String): Unit = { - sqlContext.registerRDDAsTable(this, tableName) + sqlContext.registerDataFrameAsTable(this, tableName) } override def saveAsParquetFile(path: String): Unit = { @@ -441,6 +443,35 @@ private[sql] class DataFrameImpl protected[sql]( } } + def createJDBCTable(url: String, table: String, allowExisting: Boolean): Unit = { + val conn = DriverManager.getConnection(url) + try { + if (allowExisting) { + val sql = s"DROP TABLE IF EXISTS $table" + conn.prepareStatement(sql).executeUpdate() + } + val schema = JDBCWriteDetails.schemaString(this, url) + val sql = s"CREATE TABLE $table ($schema)" + conn.prepareStatement(sql).executeUpdate() + } finally { + conn.close() + } + JDBCWriteDetails.saveTable(this, url, table) + } + + def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit = { + if (overwrite) { + val conn = DriverManager.getConnection(url) + try { + val sql = s"TRUNCATE TABLE $table" + conn.prepareStatement(sql).executeUpdate() + } finally { + conn.close() + } + } + JDBCWriteDetails.saveTable(this, url, table) + } + //////////////////////////////////////////////////////////////////////////// // for Python API //////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala b/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala index f0e6a8f332188e0f9ccf33e8faec05ee55bf3f3c..d5d7e35a6b35d9b32ddb4c4de4c9b00582c79534 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala @@ -20,8 +20,13 @@ package org.apache.spark.sql import org.apache.spark.annotation.Experimental /** + * :: Experimental :: * Holder for experimental methods for the bravest. We make NO guarantee about the stability * regarding binary compatibility and source compatibility of methods here. + * + * {{{ + * sqlContext.experimental.extraStrategies += ... + * }}} */ @Experimental class ExperimentalMethods protected[sql](sqlContext: SQLContext) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala index fc37cfa7a899f7055a04736ec7119bc027ba4a9b..b48b682b36e1f76ce0d44fbb257891b710446e6e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala @@ -173,6 +173,10 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten override def insertInto(tableName: String, overwrite: Boolean): Unit = err() + def createJDBCTable(url: String, table: String, allowExisting: Boolean): Unit = err() + + def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit = err() + override def toJSON: RDD[String] = err() protected[sql] override def javaToPython: JavaRDD[Array[Byte]] = err() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 0aae0942ca04f7bc3ffe841875803fecbc6c2561..31afa0eb59a8ecf5fa64776fa1f3b36ef80e3177 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -43,11 +43,16 @@ import org.apache.spark.util.Utils import org.apache.spark.{Partition, SparkContext} /** - * The entry point for running relational queries using Spark. Allows the creation of [[DataFrame]] - * objects and the execution of SQL queries. + * The entry point for working with structured data (rows and columns) in Spark. Allows the + * creation of [[DataFrame]] objects as well as the execution of SQL queries. * - * @groupname ddl_ops Catalog DDL functions - * @groupname userf Spark SQL Functions + * @groupname basic Basic Operations + * @groupname ddl_ops Persistent Catalog DDL + * @groupname cachemgmt Cached Table Management + * @groupname genericdata Generic Data Sources + * @groupname specificdata Specific Data Sources + * @groupname config Configuration + * @groupname dataframes Custom DataFrame Creation * @groupname Ungrouped Support functions for language integrated queries. */ class SQLContext(@transient val sparkContext: SparkContext) @@ -61,24 +66,40 @@ class SQLContext(@transient val sparkContext: SparkContext) // Note that this is a lazy val so we can override the default value in subclasses. protected[sql] lazy val conf: SQLConf = new SQLConf - /** Set Spark SQL configuration properties. */ + /** + * Set Spark SQL configuration properties. + * + * @group config + */ def setConf(props: Properties): Unit = conf.setConf(props) - /** Set the given Spark SQL configuration property. */ + /** + * Set the given Spark SQL configuration property. + * + * @group config + */ def setConf(key: String, value: String): Unit = conf.setConf(key, value) - /** Return the value of Spark SQL configuration property for the given key. */ + /** + * Return the value of Spark SQL configuration property for the given key. + * + * @group config + */ def getConf(key: String): String = conf.getConf(key) /** * Return the value of Spark SQL configuration property for the given key. If the key is not set * yet, return `defaultValue`. + * + * @group config */ def getConf(key: String, defaultValue: String): String = conf.getConf(key, defaultValue) /** * Return all the configuration properties that have been set (i.e. not the default). * This creates a new copy of the config properties in the form of a Map. + * + * @group config */ def getAllConfs: immutable.Map[String, String] = conf.getAllConfs @@ -128,7 +149,9 @@ class SQLContext(@transient val sparkContext: SparkContext) /** * :: Experimental :: * A collection of methods that are considered experimental, but can be used to hook into - * the query planner for advanced functionalities. + * the query planner for advanced functionality. + * + * @group basic */ @Experimental @transient @@ -137,6 +160,8 @@ class SQLContext(@transient val sparkContext: SparkContext) /** * :: Experimental :: * Returns a [[DataFrame]] with no rows or columns. + * + * @group basic */ @Experimental @transient @@ -167,17 +192,28 @@ class SQLContext(@transient val sparkContext: SparkContext) * (Integer arg1, String arg2) -> arg2 + arg1), * DataTypes.StringType); * }}} + * + * @group basic */ @transient val udf: UDFRegistration = new UDFRegistration(this) - /** Returns true if the table is currently cached in-memory. */ + /** + * Returns true if the table is currently cached in-memory. + * @group cachemgmt + */ def isCached(tableName: String): Boolean = cacheManager.isCached(tableName) - /** Caches the specified table in-memory. */ + /** + * Caches the specified table in-memory. + * @group cachemgmt + */ def cacheTable(tableName: String): Unit = cacheManager.cacheTable(tableName) - /** Removes the specified table from the in-memory cache. */ + /** + * Removes the specified table from the in-memory cache. + * @group cachemgmt + */ def uncacheTable(tableName: String): Unit = cacheManager.uncacheTable(tableName) // scalastyle:off @@ -186,6 +222,13 @@ class SQLContext(@transient val sparkContext: SparkContext) * :: Experimental :: * (Scala-specific) Implicit methods available in Scala for converting * common Scala objects into [[DataFrame]]s. + * + * {{{ + * val sqlContext = new SQLContext + * import sqlContext._ + * }}} + * + * @group basic */ @Experimental object implicits extends Serializable { @@ -260,7 +303,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * :: Experimental :: * Creates a DataFrame from an RDD of case classes. * - * @group userf + * @group dataframes */ @Experimental def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = { @@ -274,6 +317,8 @@ class SQLContext(@transient val sparkContext: SparkContext) /** * :: Experimental :: * Creates a DataFrame from a local Seq of Product. + * + * @group dataframes */ @Experimental def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame = { @@ -285,6 +330,8 @@ class SQLContext(@transient val sparkContext: SparkContext) /** * Convert a [[BaseRelation]] created for external data sources into a [[DataFrame]]. + * + * @group dataframes */ def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = { DataFrame(this, LogicalRelation(baseRelation)) @@ -318,6 +365,8 @@ class SQLContext(@transient val sparkContext: SparkContext) * dataFrame.registerTempTable("people") * sqlContext.sql("select name from people").collect.foreach(println) * }}} + * + * @group dataframes */ @DeveloperApi def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = { @@ -332,6 +381,8 @@ class SQLContext(@transient val sparkContext: SparkContext) * Creates a [[DataFrame]] from an [[JavaRDD]] containing [[Row]]s using the given schema. * It is important to make sure that the structure of every [[Row]] of the provided RDD matches * the provided schema. Otherwise, there will be runtime exception. + * + * @group dataframes */ @DeveloperApi def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = { @@ -346,6 +397,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * @param rowRDD an JavaRDD of Row * @param columns names for each column * @return DataFrame + * @group dataframes */ def createDataFrame(rowRDD: JavaRDD[Row], columns: java.util.List[String]): DataFrame = { createDataFrame(rowRDD.rdd, columns.toSeq) @@ -356,6 +408,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, * SELECT * queries will return the columns in an undefined order. + * @group dataframes */ def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame = { val attributeSeq = getSchema(beanClass) @@ -383,6 +436,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, * SELECT * queries will return the columns in an undefined order. + * @group dataframes */ def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = { createDataFrame(rdd.rdd, beanClass) @@ -416,8 +470,6 @@ class SQLContext(@transient val sparkContext: SparkContext) * dataFrame.registerTempTable("people") * sqlContext.sql("select name from people").collect.foreach(println) * }}} - * - * @group userf */ @deprecated("use createDataFrame", "1.3.0") def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = { @@ -455,7 +507,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty * [[DataFrame]] if no paths are passed in. * - * @group userf + * @group specificdata */ @scala.annotation.varargs def parquetFile(paths: String*): DataFrame = { @@ -473,7 +525,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * Loads a JSON file (one object per line), returning the result as a [[DataFrame]]. * It goes through the entire dataset once to determine the schema. * - * @group userf + * @group specificdata */ def jsonFile(path: String): DataFrame = jsonFile(path, 1.0) @@ -482,7 +534,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * Loads a JSON file (one object per line) and applies the given schema, * returning the result as a [[DataFrame]]. * - * @group userf + * @group specificdata */ @Experimental def jsonFile(path: String, schema: StructType): DataFrame = { @@ -492,6 +544,7 @@ class SQLContext(@transient val sparkContext: SparkContext) /** * :: Experimental :: + * @group specificdata */ @Experimental def jsonFile(path: String, samplingRatio: Double): DataFrame = { @@ -504,10 +557,18 @@ class SQLContext(@transient val sparkContext: SparkContext) * [[DataFrame]]. * It goes through the entire dataset once to determine the schema. * - * @group userf + * @group specificdata */ def jsonRDD(json: RDD[String]): DataFrame = jsonRDD(json, 1.0) + + /** + * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a + * [[DataFrame]]. + * It goes through the entire dataset once to determine the schema. + * + * @group specificdata + */ def jsonRDD(json: JavaRDD[String]): DataFrame = jsonRDD(json.rdd, 1.0) /** @@ -515,7 +576,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema, * returning the result as a [[DataFrame]]. * - * @group userf + * @group specificdata */ @Experimental def jsonRDD(json: RDD[String], schema: StructType): DataFrame = { @@ -528,6 +589,13 @@ class SQLContext(@transient val sparkContext: SparkContext) createDataFrame(rowRDD, appliedSchema) } + /** + * :: Experimental :: + * Loads an JavaRDD<String> storing JSON objects (one object per record) and applies the given + * schema, returning the result as a [[DataFrame]]. + * + * @group specificdata + */ @Experimental def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = { jsonRDD(json.rdd, schema) @@ -535,6 +603,10 @@ class SQLContext(@transient val sparkContext: SparkContext) /** * :: Experimental :: + * Loads an RDD[String] storing JSON objects (one object per record) inferring the + * schema, returning the result as a [[DataFrame]]. + * + * @group specificdata */ @Experimental def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = { @@ -546,6 +618,13 @@ class SQLContext(@transient val sparkContext: SparkContext) createDataFrame(rowRDD, appliedSchema) } + /** + * :: Experimental :: + * Loads a JavaRDD[String] storing JSON objects (one object per record) inferring the + * schema, returning the result as a [[DataFrame]]. + * + * @group specificdata + */ @Experimental def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = { jsonRDD(json.rdd, samplingRatio); @@ -555,6 +634,8 @@ class SQLContext(@transient val sparkContext: SparkContext) * :: Experimental :: * Returns the dataset stored at path as a DataFrame, * using the default data source configured by spark.sql.sources.default. + * + * @group genericdata */ @Experimental def load(path: String): DataFrame = { @@ -565,6 +646,8 @@ class SQLContext(@transient val sparkContext: SparkContext) /** * :: Experimental :: * Returns the dataset stored at path as a DataFrame, using the given data source. + * + * @group genericdata */ @Experimental def load(path: String, source: String): DataFrame = { @@ -575,6 +658,8 @@ class SQLContext(@transient val sparkContext: SparkContext) * :: Experimental :: * (Java-specific) Returns the dataset specified by the given data source and * a set of options as a DataFrame. + * + * @group genericdata */ @Experimental def load(source: String, options: java.util.Map[String, String]): DataFrame = { @@ -585,6 +670,8 @@ class SQLContext(@transient val sparkContext: SparkContext) * :: Experimental :: * (Scala-specific) Returns the dataset specified by the given data source and * a set of options as a DataFrame. + * + * @group genericdata */ @Experimental def load(source: String, options: Map[String, String]): DataFrame = { @@ -596,6 +683,8 @@ class SQLContext(@transient val sparkContext: SparkContext) * :: Experimental :: * (Java-specific) Returns the dataset specified by the given data source and * a set of options as a DataFrame, using the given schema as the schema of the DataFrame. + * + * @group genericdata */ @Experimental def load( @@ -609,6 +698,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * :: Experimental :: * (Scala-specific) Returns the dataset specified by the given data source and * a set of options as a DataFrame, using the given schema as the schema of the DataFrame. + * @group genericdata */ @Experimental def load( @@ -733,54 +823,70 @@ class SQLContext(@transient val sparkContext: SparkContext) /** * :: Experimental :: - * Construct an RDD representing the database table accessible via JDBC URL + * Construct a [[DataFrame]] representing the database table accessible via JDBC URL * url named table. + * + * @group specificdata */ @Experimental - def jdbcRDD(url: String, table: String): DataFrame = { - jdbcRDD(url, table, null.asInstanceOf[JDBCPartitioningInfo]) + def jdbc(url: String, table: String): DataFrame = { + jdbc(url, table, JDBCRelation.columnPartition(null)) } /** * :: Experimental :: - * Construct an RDD representing the database table accessible via JDBC URL - * url named table. The PartitioningInfo parameter - * gives the name of a column of integral type, a number of partitions, and - * advisory minimum and maximum values for the column. The RDD is - * partitioned according to said column. + * Construct a [[DataFrame]] representing the database table accessible via JDBC URL + * url named table. Partitions of the table will be retrieved in parallel based on the parameters + * passed to this function. + * + * @param columnName the name of a column of integral type that will be used for partitioning. + * @param lowerBound the minimum value of `columnName` to retrieve + * @param upperBound the maximum value of `columnName` to retrieve + * @param numPartitions the number of partitions. the range `minValue`-`maxValue` will be split + * evenly into this many partitions + * + * @group specificdata */ @Experimental - def jdbcRDD(url: String, table: String, partitioning: JDBCPartitioningInfo): - DataFrame = { + def jdbc( + url: String, + table: String, + columnName: String, + lowerBound: Long, + upperBound: Long, + numPartitions: Int): DataFrame = { + val partitioning = JDBCPartitioningInfo(columnName, lowerBound, upperBound, numPartitions) val parts = JDBCRelation.columnPartition(partitioning) - jdbcRDD(url, table, parts) + jdbc(url, table, parts) } /** * :: Experimental :: - * Construct an RDD representing the database table accessible via JDBC URL + * Construct a [[DataFrame]] representing the database table accessible via JDBC URL * url named table. The theParts parameter gives a list expressions * suitable for inclusion in WHERE clauses; each one defines one partition - * of the RDD. + * of the [[DataFrame]]. + * + * @group specificdata */ @Experimental - def jdbcRDD(url: String, table: String, theParts: Array[String]): DataFrame = { + def jdbc(url: String, table: String, theParts: Array[String]): DataFrame = { val parts: Array[Partition] = theParts.zipWithIndex.map { case (part, i) => JDBCPartition(part, i) : Partition } - jdbcRDD(url, table, parts) + jdbc(url, table, parts) } - private def jdbcRDD(url: String, table: String, parts: Array[Partition]): DataFrame = { + private def jdbc(url: String, table: String, parts: Array[Partition]): DataFrame = { val relation = JDBCRelation(url, table, parts)(this) baseRelationToDataFrame(relation) } /** - * Registers the given RDD as a temporary table in the catalog. Temporary tables exist only - * during the lifetime of this instance of SQLContext. + * Registers the given [[DataFrame]] as a temporary table in the catalog. Temporary tables exist + * only during the lifetime of this instance of SQLContext. */ - private[sql] def registerRDDAsTable(rdd: DataFrame, tableName: String): Unit = { + private[sql] def registerDataFrameAsTable(rdd: DataFrame, tableName: String): Unit = { catalog.registerTable(Seq(tableName), rdd.logicalPlan) } @@ -790,7 +896,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * * @param tableName the name of the table to be unregistered. * - * @group ddl_ops + * @group basic */ def dropTempTable(tableName: String): Unit = { cacheManager.tryUncacheQuery(table(tableName)) @@ -801,7 +907,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * Executes a SQL query using Spark, returning the result as a [[DataFrame]]. The dialect that is * used for SQL parsing can be configured with 'spark.sql.dialect'. * - * @group userf + * @group basic */ def sql(sqlText: String): DataFrame = { if (conf.dialect == "sql") { @@ -811,7 +917,11 @@ class SQLContext(@transient val sparkContext: SparkContext) } } - /** Returns the specified table as a [[DataFrame]]. */ + /** + * Returns the specified table as a [[DataFrame]]. + * + * @group ddl_ops + */ def table(tableName: String): DataFrame = DataFrame(this, catalog.lookupRelation(Seq(tableName))) @@ -819,6 +929,8 @@ class SQLContext(@transient val sparkContext: SparkContext) * Returns a [[DataFrame]] containing names of existing tables in the current database. * The returned DataFrame has two columns, tableName and isTemporary (a Boolean * indicating if a table is a temporary one or not). + * + * @group ddl_ops */ def tables(): DataFrame = { DataFrame(this, ShowTablesCommand(None)) @@ -828,6 +940,8 @@ class SQLContext(@transient val sparkContext: SparkContext) * Returns a [[DataFrame]] containing names of existing tables in the given database. * The returned DataFrame has two columns, tableName and isTemporary (a Boolean * indicating if a table is a temporary one or not). + * + * @group ddl_ops */ def tables(databaseName: String): DataFrame = { DataFrame(this, ShowTablesCommand(Some(databaseName))) @@ -835,6 +949,8 @@ class SQLContext(@transient val sparkContext: SparkContext) /** * Returns the names of tables in the current database as an array. + * + * @group ddl_ops */ def tableNames(): Array[String] = { catalog.getTables(None).map { @@ -844,6 +960,8 @@ class SQLContext(@transient val sparkContext: SparkContext) /** * Returns the names of tables in the given database as an array. + * + * @group ddl_ops */ def tableNames(databaseName: String): Array[String] = { catalog.getTables(Some(databaseName)).map { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala b/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala index ee94a5fdbe3768eeab4df05c78499d26c43e6612..295db539adfc4b6d045092ae799ae99be417f189 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.types.DataType * df.select( predict(df("score")) ) * }}} */ -case class UserDefinedFunction(f: AnyRef, dataType: DataType) { +case class UserDefinedFunction protected[sql] (f: AnyRef, dataType: DataType) { def apply(exprs: Column*): Column = { Column(ScalaUdf(f, dataType, exprs.map(_.expr))) @@ -58,6 +58,7 @@ private[sql] case class UserDefinedPythonFunction( accumulator: Accumulator[JList[Array[Byte]]], dataType: DataType) { + /** Returns a [[Column]] that will evaluate to calling this UDF with the given input. */ def apply(exprs: Column*): Column = { val udf = PythonUDF(name, command, envVars, pythonIncludes, pythonExec, broadcastVars, accumulator, dataType, exprs.map(_.expr)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/package.scala new file mode 100644 index 0000000000000000000000000000000000000000..cbbd005228d444c5db17b80bccd0f9583b88f9ff --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/package.scala @@ -0,0 +1,23 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql + +/** + * Contains API classes that are specific to a single language (i.e. Java). + */ +package object api diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index c6cd6eb6a22b6d2b4a39383449b8899a8c9fb0b8..7c92e9fc881680b74ccee30d7bf924eb0b8aff95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -144,7 +144,7 @@ case class CacheTableCommand( override def run(sqlContext: SQLContext) = { plan.foreach { logicalPlan => - sqlContext.registerRDDAsTable(DataFrame(sqlContext, logicalPlan), tableName) + sqlContext.registerDataFrameAsTable(DataFrame(sqlContext, logicalPlan), tableName) } sqlContext.cacheTable(tableName) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index acef49aabfe70e82b27f6ca56e48a017c3053e27..73162b22fa9cd3943c6f29ae02149290f6e5728a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -32,7 +32,9 @@ import org.apache.spark.sql.types._ * * Usage: * {{{ - * sql("SELECT key FROM src").debug + * import org.apache.spark.sql.execution.debug._ + * sql("SELECT key FROM src").debug() + * dataFrame.typeCheck() * }}} */ package object debug { @@ -144,11 +146,9 @@ package object debug { } /** - * :: DeveloperApi :: * Helper functions for checking that runtime types match a given schema. */ - @DeveloperApi - object TypeCheck { + private[sql] object TypeCheck { def typeCheck(data: Any, schema: DataType): Unit = (data, schema) match { case (null, _) => @@ -174,10 +174,8 @@ package object debug { } /** - * :: DeveloperApi :: * Augments [[DataFrame]]s with debug methods. */ - @DeveloperApi private[sql] case class TypeCheck(child: SparkPlan) extends SparkPlan { import TypeCheck._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala index 34a83f0a5dad8f4c0cb19c641aca6f63ed701053..34f864f5fda7a8f9ad80cd9853995329ff982ccd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala @@ -26,11 +26,11 @@ import org.apache.spark.sql.jdbc.{JDBCPartitioningInfo, JDBCRelation, JDBCPartit import org.apache.spark.sql.types._ package object jdbc { - object JDBCWriteDetails extends Logging { + private[sql] object JDBCWriteDetails extends Logging { /** * Returns a PreparedStatement that inserts a row into table via conn. */ - private def insertStatement(conn: Connection, table: String, rddSchema: StructType): + def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = { val sql = new StringBuilder(s"INSERT INTO $table VALUES (") var fieldsLeft = rddSchema.fields.length @@ -56,7 +56,7 @@ package object jdbc { * non-Serializable. Instead, we explicitly close over all variables that * are used. */ - private[jdbc] def savePartition(url: String, table: String, iterator: Iterator[Row], + def savePartition(url: String, table: String, iterator: Iterator[Row], rddSchema: StructType, nullTypes: Array[Int]): Iterator[Byte] = { val conn = DriverManager.getConnection(url) var committed = false @@ -117,19 +117,14 @@ package object jdbc { } Array[Byte]().iterator } - } - /** - * Make it so that you can call createJDBCTable and insertIntoJDBC on a DataFrame. - */ - implicit class JDBCDataFrame(rdd: DataFrame) { /** * Compute the schema string for this RDD. */ - private def schemaString(url: String): String = { + def schemaString(df: DataFrame, url: String): String = { val sb = new StringBuilder() val quirks = DriverQuirks.get(url) - rdd.schema.fields foreach { field => { + df.schema.fields foreach { field => { val name = field.name var typ: String = quirks.getJDBCType(field.dataType)._1 if (typ == null) typ = field.dataType match { @@ -156,9 +151,9 @@ package object jdbc { /** * Saves the RDD to the database in a single transaction. */ - private def saveTable(url: String, table: String) { + def saveTable(df: DataFrame, url: String, table: String) { val quirks = DriverQuirks.get(url) - var nullTypes: Array[Int] = rdd.schema.fields.map(field => { + var nullTypes: Array[Int] = df.schema.fields.map(field => { var nullType: Option[Int] = quirks.getJDBCType(field.dataType)._2 if (nullType.isEmpty) { field.dataType match { @@ -175,61 +170,16 @@ package object jdbc { case DateType => java.sql.Types.DATE case DecimalType.Unlimited => java.sql.Types.DECIMAL case _ => throw new IllegalArgumentException( - s"Can't translate null value for field $field") + s"Can't translate null value for field $field") } } else nullType.get }).toArray - val rddSchema = rdd.schema - rdd.mapPartitions(iterator => JDBCWriteDetails.savePartition( - url, table, iterator, rddSchema, nullTypes)).collect() - } - - /** - * Save this RDD to a JDBC database at `url` under the table name `table`. - * This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements. - * If you pass `true` for `allowExisting`, it will drop any table with the - * given name; if you pass `false`, it will throw if the table already - * exists. - */ - def createJDBCTable(url: String, table: String, allowExisting: Boolean) { - val conn = DriverManager.getConnection(url) - try { - if (allowExisting) { - val sql = s"DROP TABLE IF EXISTS $table" - conn.prepareStatement(sql).executeUpdate() - } - val schema = schemaString(url) - val sql = s"CREATE TABLE $table ($schema)" - conn.prepareStatement(sql).executeUpdate() - } finally { - conn.close() + val rddSchema = df.schema + df.foreachPartition { iterator => + JDBCWriteDetails.savePartition(url, table, iterator, rddSchema, nullTypes) } - saveTable(url, table) } - /** - * Save this RDD to a JDBC database at `url` under the table name `table`. - * Assumes the table already exists and has a compatible schema. If you - * pass `true` for `overwrite`, it will `TRUNCATE` the table before - * performing the `INSERT`s. - * - * The table must already exist on the database. It must have a schema - * that is compatible with the schema of this RDD; inserting the rows of - * the RDD in order via the simple statement - * `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail. - */ - def insertIntoJDBC(url: String, table: String, overwrite: Boolean) { - if (overwrite) { - val conn = DriverManager.getConnection(url) - try { - val sql = s"TRUNCATE TABLE $table" - conn.prepareStatement(sql).executeUpdate() - } finally { - conn.close() - } - } - saveTable(url, table) - } - } // implicit class JDBCDataFrame + } } // package object jdbc diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 7dd8bea49b8a523cd408f8c789d9bf639b081092..65966458eb670cab173e7fa08205e2e70f9d56ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -55,7 +55,7 @@ import org.apache.spark.{Logging, SerializableWritable, TaskContext} * Parquet table scan operator. Imports the file that backs the given * [[org.apache.spark.sql.parquet.ParquetRelation]] as a ``RDD[Row]``. */ -case class ParquetTableScan( +private[sql] case class ParquetTableScan( attributes: Seq[Attribute], relation: ParquetRelation, columnPruningPred: Seq[Expression]) @@ -210,7 +210,7 @@ case class ParquetTableScan( * (only detected via filename pattern so will not catch all cases). */ @DeveloperApi -case class InsertIntoParquetTable( +private[sql] case class InsertIntoParquetTable( relation: ParquetRelation, child: SparkPlan, overwrite: Boolean = false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala index d0856df8d4f43b4a3f3216787e4ed84384228440..052728c5d5ceb3ac0c14a372081098c0058d4906 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala @@ -34,7 +34,7 @@ import org.apache.spark.util.Utils * convenient to use tuples rather than special case classes when writing test cases/suites. * Especially, `Tuple1.apply` can be used to easily wrap a single type/value. */ -trait ParquetTest { +private[sql] trait ParquetTest { val sqlContext: SQLContext import sqlContext.implicits.{localSeqToDataFrameHolder, rddToDataFrameHolder} @@ -121,7 +121,7 @@ trait ParquetTest { (data: Seq[T], tableName: String) (f: => Unit): Unit = { withParquetRDD(data) { rdd => - sqlContext.registerRDDAsTable(rdd, tableName) + sqlContext.registerDataFrameAsTable(rdd, tableName) withTempTable(tableName)(f) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 9bb34e2df9a26466384af8b1f2424d5e36ad97bd..95bea9201163d54e8f62e5616975072782641481 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -72,7 +72,7 @@ import org.apache.spark.{Logging, Partition => SparkPartition, SerializableWrita * null or empty string. This is similar to the `hive.exec.default.partition.name` configuration * in Hive. */ -class DefaultSource +private[sql] class DefaultSource extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider { @@ -147,7 +147,7 @@ private[sql] case class PartitionSpec(partitionColumns: StructType, partitions: * discovery. */ @DeveloperApi -case class ParquetRelation2( +private[sql] case class ParquetRelation2( paths: Seq[String], parameters: Map[String, String], maybeSchema: Option[StructType] = None, @@ -600,7 +600,7 @@ case class ParquetRelation2( } } -object ParquetRelation2 { +private[sql] object ParquetRelation2 { // Whether we should merge schemas collected from all Parquet part-files. val MERGE_SCHEMA = "mergeSchema" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala index 887161684429fbadef600fa87821c08fe5662786..e24475292ceaf973f70f33ef9537a61c716a67a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala @@ -53,7 +53,7 @@ private[parquet] class NanoTime extends Serializable { "NanoTime{julianDay=" + julianDay + ", timeOfDayNanos=" + timeOfDayNanos + "}" } -object NanoTime { +private[sql] object NanoTime { def fromBinary(bytes: Binary): NanoTime = { Preconditions.checkArgument(bytes.length() == 12, "Must be 12 bytes") val buf = bytes.toByteBuffer diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index dd8b3d211be648bce687ae702903957640428d3d..5020689f7a10559552b4c55a7a5fbb1ee5eaa4f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -374,7 +374,7 @@ private[sql] case class CreateTempTableUsing( def run(sqlContext: SQLContext) = { val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options) - sqlContext.registerRDDAsTable( + sqlContext.registerDataFrameAsTable( DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName) Seq.empty } @@ -390,7 +390,7 @@ private[sql] case class CreateTempTableUsingAsSelect( def run(sqlContext: SQLContext) = { val df = DataFrame(sqlContext, query) val resolved = ResolvedDataSource(sqlContext, provider, mode, options, df) - sqlContext.registerRDDAsTable( + sqlContext.registerDataFrameAsTable( DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName) Seq.empty diff --git a/sql/core/src/test/java/org/apache/spark/sql/jdbc/JavaJDBCTest.java b/sql/core/src/test/java/org/apache/spark/sql/jdbc/JavaJDBCTest.java deleted file mode 100644 index 80bd74f5b5525a543badd8d36c5854710a383377..0000000000000000000000000000000000000000 --- a/sql/core/src/test/java/org/apache/spark/sql/jdbc/JavaJDBCTest.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.jdbc; - -import org.junit.*; -import static org.junit.Assert.*; -import java.sql.Connection; -import java.sql.DriverManager; - -import org.apache.spark.SparkEnv; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.SQLContext; -import org.apache.spark.sql.DataFrame; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.api.java.*; -import org.apache.spark.sql.test.TestSQLContext$; - -public class JavaJDBCTest { - static String url = "jdbc:h2:mem:testdb1"; - - static Connection conn = null; - - // This variable will always be null if TestSQLContext is intact when running - // these tests. Some Java tests do not play nicely with others, however; - // they create a SparkContext of their own at startup and stop it at exit. - // This renders TestSQLContext inoperable, meaning we have to do the same - // thing. If this variable is nonnull, that means we allocated a - // SparkContext of our own and that we need to stop it at teardown. - static JavaSparkContext localSparkContext = null; - - static SQLContext sql = TestSQLContext$.MODULE$; - - @Before - public void beforeTest() throws Exception { - if (SparkEnv.get() == null) { // A previous test destroyed TestSQLContext. - localSparkContext = new JavaSparkContext("local", "JavaAPISuite"); - sql = new SQLContext(localSparkContext); - } - Class.forName("org.h2.Driver"); - conn = DriverManager.getConnection(url); - conn.prepareStatement("create schema test").executeUpdate(); - conn.prepareStatement("create table test.people (name TEXT(32) NOT NULL, theid INTEGER NOT NULL)").executeUpdate(); - conn.prepareStatement("insert into test.people values ('fred', 1)").executeUpdate(); - conn.prepareStatement("insert into test.people values ('mary', 2)").executeUpdate(); - conn.prepareStatement("insert into test.people values ('joe', 3)").executeUpdate(); - conn.commit(); - } - - @After - public void afterTest() throws Exception { - if (localSparkContext != null) { - localSparkContext.stop(); - localSparkContext = null; - } - try { - conn.close(); - } finally { - conn = null; - } - } - - @Test - public void basicTest() throws Exception { - DataFrame rdd = JDBCUtils.jdbcRDD(sql, url, "TEST.PEOPLE"); - Row[] rows = rdd.collect(); - assertEquals(rows.length, 3); - } - - @Test - public void partitioningTest() throws Exception { - String[] parts = new String[2]; - parts[0] = "THEID < 2"; - parts[1] = "THEID = 2"; // Deliberately forget about one of them. - DataFrame rdd = JDBCUtils.jdbcRDD(sql, url, "TEST.PEOPLE", parts); - Row[] rows = rdd.collect(); - assertEquals(rows.length, 2); - } - - @Test - public void writeTest() throws Exception { - DataFrame rdd = JDBCUtils.jdbcRDD(sql, url, "TEST.PEOPLE"); - JDBCUtils.createJDBCTable(rdd, url, "TEST.PEOPLECOPY", false); - DataFrame rdd2 = JDBCUtils.jdbcRDD(sql, url, "TEST.PEOPLECOPY"); - Row[] rows = rdd2.collect(); - assertEquals(rows.length, 3); - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index d25c1390db15c70f0138c02359510556ddd4d149..07db672217bc1da12beda5c69371743af5dfb282 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -164,17 +164,16 @@ class JDBCSuite extends FunSuite with BeforeAndAfter { } test("Basic API") { - assert(TestSQLContext.jdbcRDD(url, "TEST.PEOPLE").collect.size == 3) + assert(TestSQLContext.jdbc(url, "TEST.PEOPLE").collect.size == 3) } test("Partitioning via JDBCPartitioningInfo API") { - val parts = JDBCPartitioningInfo("THEID", 0, 4, 3) - assert(TestSQLContext.jdbcRDD(url, "TEST.PEOPLE", parts).collect.size == 3) + assert(TestSQLContext.jdbc(url, "TEST.PEOPLE", "THEID", 0, 4, 3).collect.size == 3) } test("Partitioning via list-of-where-clauses API") { val parts = Array[String]("THEID < 2", "THEID >= 2") - assert(TestSQLContext.jdbcRDD(url, "TEST.PEOPLE", parts).collect.size == 3) + assert(TestSQLContext.jdbc(url, "TEST.PEOPLE", parts).collect.size == 3) } test("H2 integral types") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index 21e70936102fddcdc620a9138c9836f37c10aee2..ad2fbc3f04a9c2872c2b5019e98e863317c0190c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -57,8 +57,8 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter { val srdd = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2) srdd.createJDBCTable(url, "TEST.BASICCREATETEST", false) - assert(2 == TestSQLContext.jdbcRDD(url, "TEST.BASICCREATETEST").count) - assert(2 == TestSQLContext.jdbcRDD(url, "TEST.BASICCREATETEST").collect()(0).length) + assert(2 == TestSQLContext.jdbc(url, "TEST.BASICCREATETEST").count) + assert(2 == TestSQLContext.jdbc(url, "TEST.BASICCREATETEST").collect()(0).length) } test("CREATE with overwrite") { @@ -66,12 +66,12 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter { val srdd2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2) srdd.createJDBCTable(url, "TEST.DROPTEST", false) - assert(2 == TestSQLContext.jdbcRDD(url, "TEST.DROPTEST").count) - assert(3 == TestSQLContext.jdbcRDD(url, "TEST.DROPTEST").collect()(0).length) + assert(2 == TestSQLContext.jdbc(url, "TEST.DROPTEST").count) + assert(3 == TestSQLContext.jdbc(url, "TEST.DROPTEST").collect()(0).length) srdd2.createJDBCTable(url, "TEST.DROPTEST", true) - assert(1 == TestSQLContext.jdbcRDD(url, "TEST.DROPTEST").count) - assert(2 == TestSQLContext.jdbcRDD(url, "TEST.DROPTEST").collect()(0).length) + assert(1 == TestSQLContext.jdbc(url, "TEST.DROPTEST").count) + assert(2 == TestSQLContext.jdbc(url, "TEST.DROPTEST").collect()(0).length) } test("CREATE then INSERT to append") { @@ -80,8 +80,8 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter { srdd.createJDBCTable(url, "TEST.APPENDTEST", false) srdd2.insertIntoJDBC(url, "TEST.APPENDTEST", false) - assert(3 == TestSQLContext.jdbcRDD(url, "TEST.APPENDTEST").count) - assert(2 == TestSQLContext.jdbcRDD(url, "TEST.APPENDTEST").collect()(0).length) + assert(3 == TestSQLContext.jdbc(url, "TEST.APPENDTEST").count) + assert(2 == TestSQLContext.jdbc(url, "TEST.APPENDTEST").collect()(0).length) } test("CREATE then INSERT to truncate") { @@ -90,8 +90,8 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter { srdd.createJDBCTable(url, "TEST.TRUNCATETEST", false) srdd2.insertIntoJDBC(url, "TEST.TRUNCATETEST", true) - assert(1 == TestSQLContext.jdbcRDD(url, "TEST.TRUNCATETEST").count) - assert(2 == TestSQLContext.jdbcRDD(url, "TEST.TRUNCATETEST").collect()(0).length) + assert(1 == TestSQLContext.jdbc(url, "TEST.TRUNCATETEST").count) + assert(2 == TestSQLContext.jdbc(url, "TEST.TRUNCATETEST").collect()(0).length) } test("Incompatible INSERT to append") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala index 89920f2650c3ac424e1281d90813a79f430fda4f..4f38110c80cc6d418ee81af008c96d123ec0e412 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala @@ -143,7 +143,7 @@ class MySQLDatabase { } test("Basic test") { - val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "tbl") + val rdd = TestSQLContext.jdbc(url(ip, "foo"), "tbl") val rows = rdd.collect assert(rows.length == 2) val types = rows(0).toSeq.map(x => x.getClass.toString) @@ -153,7 +153,7 @@ class MySQLDatabase { } test("Numeric types") { - val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "numbers") + val rdd = TestSQLContext.jdbc(url(ip, "foo"), "numbers") val rows = rdd.collect assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) @@ -181,7 +181,7 @@ class MySQLDatabase { } test("Date types") { - val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "dates") + val rdd = TestSQLContext.jdbc(url(ip, "foo"), "dates") val rows = rdd.collect assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) @@ -199,7 +199,7 @@ class MySQLDatabase { } test("String types") { - val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "strings") + val rdd = TestSQLContext.jdbc(url(ip, "foo"), "strings") val rows = rdd.collect assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) @@ -225,9 +225,9 @@ class MySQLDatabase { } test("Basic write test") { - val rdd1 = TestSQLContext.jdbcRDD(url(ip, "foo"), "numbers") - val rdd2 = TestSQLContext.jdbcRDD(url(ip, "foo"), "dates") - val rdd3 = TestSQLContext.jdbcRDD(url(ip, "foo"), "strings") + val rdd1 = TestSQLContext.jdbc(url(ip, "foo"), "numbers") + val rdd2 = TestSQLContext.jdbc(url(ip, "foo"), "dates") + val rdd3 = TestSQLContext.jdbc(url(ip, "foo"), "strings") rdd1.createJDBCTable(url(ip, "foo"), "numberscopy", false) rdd2.createJDBCTable(url(ip, "foo"), "datescopy", false) rdd3.createJDBCTable(url(ip, "foo"), "stringscopy", false) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala index c174d7adb72042017e91d736c977e61d32848a05..7b47feeb7887e34f3283bf0bebd584a26886016e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala @@ -113,7 +113,7 @@ class PostgresDatabase { } test("Type mapping for various types") { - val rdd = TestSQLContext.jdbcRDD(url(db.ip), "public.bar") + val rdd = TestSQLContext.jdbc(url(db.ip), "public.bar") val rows = rdd.collect assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) @@ -142,7 +142,7 @@ class PostgresDatabase { } test("Basic write test") { - val rdd = TestSQLContext.jdbcRDD(url(db.ip), "public.bar") + val rdd = TestSQLContext.jdbc(url(db.ip), "public.bar") rdd.createJDBCTable(url(db.ip), "public.barcopy", false) // Test only that it doesn't bomb out. } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala index bfacc51ef57ab0d823257a6adc1351218f408761..07b5a84fb6602af390be4c961f53f3df1d3ae982 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala @@ -29,9 +29,9 @@ import org.apache.spark.sql.hive.HiveShim import org.apache.spark.sql.SQLContext /** - * Implementation for "describe [extended] table". - * * :: DeveloperApi :: + * + * Implementation for "describe [extended] table". */ @DeveloperApi case class DescribeHiveTableCommand( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 0aa5f7f7b88bd03cae3557da98af52a9332b7fef..6afd8eea05418e674b28512f9c61ccd01006f712 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -102,6 +102,10 @@ case class AddFile(path: String) extends RunnableCommand { } } +/** + * :: DeveloperApi :: + */ +@DeveloperApi case class CreateMetastoreDataSource( tableName: String, userSpecifiedSchema: Option[StructType], @@ -141,6 +145,10 @@ case class CreateMetastoreDataSource( } } +/** + * :: DeveloperApi :: + */ +@DeveloperApi case class CreateMetastoreDataSourceAsSelect( tableName: String, provider: String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JavaJDBCTrampoline.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/package.scala similarity index 66% rename from sql/core/src/main/scala/org/apache/spark/sql/jdbc/JavaJDBCTrampoline.scala rename to sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/package.scala index 86bb67ec7425644218370a9bb9ddc4c6c89ecb83..4989c42e964ec8e415591cdf9f28527ac4cb88e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JavaJDBCTrampoline.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/package.scala @@ -15,16 +15,11 @@ * limitations under the License. */ -package org.apache.spark.sql.jdbc +package org.apache.spark.sql.hive -import org.apache.spark.sql.DataFrame - -private[jdbc] class JavaJDBCTrampoline { - def createJDBCTable(rdd: DataFrame, url: String, table: String, allowExisting: Boolean) { - rdd.createJDBCTable(url, table, allowExisting); - } - - def insertIntoJDBC(rdd: DataFrame, url: String, table: String, overwrite: Boolean) { - rdd.insertIntoJDBC(url, table, overwrite); - } -} +/** + * Physical execution operators used for running queries against data stored in Hive. These + * are not intended for use by users, but are documents so that it is easier to understand + * the output of EXPLAIN queries. + */ +package object execution diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/package.scala index a6c8ed4f7e86626b3161253a53ea06ce29984968..db074361ef03c4399620b50d33aa4b8c94f17887 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/package.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/package.scala @@ -17,4 +17,14 @@ package org.apache.spark.sql +/** + * Support for running Spark SQL queries using functionality from Apache Hive (does not require an + * existing Hive installation). Supported Hive features include: + * - Using HiveQL to express queries. + * - Reading metadata from the Hive Metastore using HiveSerDes. + * - Hive UDFs, UDAs, UDTs + * + * Users that would like access to this functionality should create a + * [[hive.HiveContext HiveContext]] instead of a [[SQLContext]]. + */ package object hive diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala deleted file mode 100644 index 2a16c9d1a27c94c70258cfb632f2334565fb883e..0000000000000000000000000000000000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.parquet - -import java.util.Properties - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category -import org.apache.hadoop.hive.serde2.{SerDeStats, SerDe} -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector -import org.apache.hadoop.io.Writable - -/** - * A placeholder that allows Spark SQL users to create metastore tables that are stored as - * parquet files. It is only intended to pass the checks that the serde is valid and exists - * when a CREATE TABLE is run. The actual work of decoding will be done by ParquetTableScan - * when "spark.sql.hive.convertMetastoreParquet" is set to true. - */ -@deprecated("No code should depend on FakeParquetHiveSerDe as it is only intended as a " + - "placeholder in the Hive MetaStore", "1.2.0") -class FakeParquetSerDe extends SerDe { - override def getObjectInspector: ObjectInspector = new ObjectInspector { - override def getCategory: Category = Category.PRIMITIVE - - override def getTypeName: String = "string" - } - - override def deserialize(p1: Writable): AnyRef = throwError - - override def initialize(p1: Configuration, p2: Properties): Unit = {} - - override def getSerializedClass: Class[_ <: Writable] = throwError - - override def getSerDeStats: SerDeStats = throwError - - override def serialize(p1: scala.Any, p2: ObjectInspector): Writable = throwError - - private def throwError = - sys.error( - "spark.sql.hive.convertMetastoreParquet must be set to true to use FakeParquetSerDe") -} diff --git a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala index 8534c7d7064e5203a2dbb95f807da0a1b32d3873..30646ddbc29d8596b9ec6421eb9834f7894db494 100644 --- a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala +++ b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala @@ -43,7 +43,9 @@ import org.apache.hadoop.mapred.InputFormat import org.apache.spark.sql.types.{Decimal, DecimalType} -case class HiveFunctionWrapper(functionClassName: String) extends java.io.Serializable { +private[hive] case class HiveFunctionWrapper(functionClassName: String) + extends java.io.Serializable { + // for Serialization def this() = this(null) @@ -249,6 +251,9 @@ private[hive] object HiveShim { def setTblNullFormat(crtTbl: CreateTableDesc, tbl: Table) = {} } -class ShimFileSinkDesc(var dir: String, var tableInfo: TableDesc, var compressed: Boolean) +private[hive] class ShimFileSinkDesc( + var dir: String, + var tableInfo: TableDesc, + var compressed: Boolean) extends FileSinkDesc(dir, tableInfo, compressed) { } diff --git a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala index 72104f5b55761029d22ef1a2e4e7dbcec5685b4c..f9fcbdae15745f7264f75a2b2621f8de5c08106f 100644 --- a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala +++ b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala @@ -56,7 +56,9 @@ import org.apache.spark.sql.types.{Decimal, DecimalType} * * @param functionClassName UDF class name */ -case class HiveFunctionWrapper(var functionClassName: String) extends java.io.Externalizable { +private[hive] case class HiveFunctionWrapper(var functionClassName: String) + extends java.io.Externalizable { + // for Serialization def this() = this(null) @@ -423,7 +425,10 @@ private[hive] object HiveShim { * Bug introduced in hive-0.13. FileSinkDesc is serilizable, but its member path is not. * Fix it through wrapper. */ -class ShimFileSinkDesc(var dir: String, var tableInfo: TableDesc, var compressed: Boolean) +private[hive] class ShimFileSinkDesc( + var dir: String, + var tableInfo: TableDesc, + var compressed: Boolean) extends Serializable with Logging { var compressCodec: String = _ var compressType: String = _