From 18ee55dd5de0597d7fb69e8e16ac3744356a6918 Mon Sep 17 00:00:00 2001 From: Wenchen Fan <wenchen@databricks.com> Date: Tue, 17 Jan 2017 12:54:50 +0800 Subject: [PATCH] [SPARK-19148][SQL] do not expose the external table concept in Catalog ## What changes were proposed in this pull request? In https://github.com/apache/spark/pull/16296 , we reached a consensus that we should hide the external/managed table concept to users and only expose custom table path. This PR renames `Catalog.createExternalTable` to `createTable`(still keep the old versions for backward compatibility), and only set the table type to EXTERNAL if `path` is specified in options. ## How was this patch tested? new tests in `CatalogSuite` Author: Wenchen Fan <wenchen@databricks.com> Closes #16528 from cloud-fan/create-table. --- project/MimaExcludes.scala | 5 +- python/pyspark/sql/catalog.py | 27 +++- .../apache/spark/sql/catalog/Catalog.scala | 129 +++++++++++++++--- .../command/createDataSourceTables.scala | 9 -- .../spark/sql/internal/CatalogImpl.scala | 78 ++++------- .../spark/sql/internal/CatalogSuite.scala | 66 ++++++--- 6 files changed, 211 insertions(+), 103 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 2314d7f45c..e0ee00e682 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -43,7 +43,10 @@ object MimaExcludes { ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.recoverPartitions"), // [SPARK-18537] Add a REST api to spark streaming - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.streaming.scheduler.StreamingListener.onStreamingStarted") + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.streaming.scheduler.StreamingListener.onStreamingStarted"), + + // [SPARK-19148][SQL] do not expose the external table concept in Catalog + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.createTable") ) // Exclude rules for 2.1.x diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index 30c7a3fe4f..253a750629 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -15,6 +15,7 @@ # limitations under the License. # +import warnings from collections import namedtuple from pyspark import since @@ -138,7 +139,27 @@ class Catalog(object): @since(2.0) def createExternalTable(self, tableName, path=None, source=None, schema=None, **options): - """Creates an external table based on the dataset in a data source. + """Creates a table based on the dataset in a data source. + + It returns the DataFrame associated with the external table. + + The data source is specified by the ``source`` and a set of ``options``. + If ``source`` is not specified, the default data source configured by + ``spark.sql.sources.default`` will be used. + + Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and + created external table. + + :return: :class:`DataFrame` + """ + warnings.warn( + "createExternalTable is deprecated since Spark 2.2, please use createTable instead.", + DeprecationWarning) + return self.createTable(tableName, path, source, schema, **options) + + @since(2.2) + def createTable(self, tableName, path=None, source=None, schema=None, **options): + """Creates a table based on the dataset in a data source. It returns the DataFrame associated with the external table. @@ -157,12 +178,12 @@ class Catalog(object): source = self._sparkSession.conf.get( "spark.sql.sources.default", "org.apache.spark.sql.parquet") if schema is None: - df = self._jcatalog.createExternalTable(tableName, source, options) + df = self._jcatalog.createTable(tableName, source, options) else: if not isinstance(schema, StructType): raise TypeError("schema should be StructType") scala_datatype = self._jsparkSession.parseDataType(schema.json()) - df = self._jcatalog.createExternalTable(tableName, source, scala_datatype, options) + df = self._jcatalog.createTable(tableName, source, scala_datatype, options) return DataFrame(df, self._sparkSession._wrapped) @since(2.0) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 6b061f8ab2..41e781ed18 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalog +import scala.collection.JavaConverters._ + import org.apache.spark.annotation.{Experimental, InterfaceStability} import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset} import org.apache.spark.sql.types.StructType @@ -187,82 +189,169 @@ abstract class Catalog { def functionExists(dbName: String, functionName: String): Boolean /** - * :: Experimental :: - * Creates an external table from the given path and returns the corresponding DataFrame. + * Creates a table from the given path and returns the corresponding DataFrame. * It will use the default data source configured by spark.sql.sources.default. * * @since 2.0.0 */ + @deprecated("use createTable instead.", "2.2.0") + def createExternalTable(tableName: String, path: String): DataFrame = { + createTable(tableName, path) + } + + /** + * :: Experimental :: + * Creates a table from the given path and returns the corresponding DataFrame. + * It will use the default data source configured by spark.sql.sources.default. + * + * @since 2.2.0 + */ @Experimental @InterfaceStability.Evolving - def createExternalTable(tableName: String, path: String): DataFrame + def createTable(tableName: String, path: String): DataFrame /** - * :: Experimental :: - * Creates an external table from the given path based on a data source - * and returns the corresponding DataFrame. + * Creates a table from the given path based on a data source and returns the corresponding + * DataFrame. * * @since 2.0.0 */ + @deprecated("use createTable instead.", "2.2.0") + def createExternalTable(tableName: String, path: String, source: String): DataFrame = { + createTable(tableName, path, source) + } + + /** + * :: Experimental :: + * Creates a table from the given path based on a data source and returns the corresponding + * DataFrame. + * + * @since 2.2.0 + */ @Experimental @InterfaceStability.Evolving - def createExternalTable(tableName: String, path: String, source: String): DataFrame + def createTable(tableName: String, path: String, source: String): DataFrame /** - * :: Experimental :: - * Creates an external table from the given path based on a data source and a set of options. + * Creates a table from the given path based on a data source and a set of options. * Then, returns the corresponding DataFrame. * * @since 2.0.0 */ + @deprecated("use createTable instead.", "2.2.0") + def createExternalTable( + tableName: String, + source: String, + options: java.util.Map[String, String]): DataFrame = { + createTable(tableName, source, options) + } + + /** + * :: Experimental :: + * Creates a table from the given path based on a data source and a set of options. + * Then, returns the corresponding DataFrame. + * + * @since 2.2.0 + */ @Experimental @InterfaceStability.Evolving + def createTable( + tableName: String, + source: String, + options: java.util.Map[String, String]): DataFrame = { + createTable(tableName, source, options.asScala.toMap) + } + + /** + * (Scala-specific) + * Creates a table from the given path based on a data source and a set of options. + * Then, returns the corresponding DataFrame. + * + * @since 2.0.0 + */ + @deprecated("use createTable instead.", "2.2.0") def createExternalTable( tableName: String, source: String, - options: java.util.Map[String, String]): DataFrame + options: Map[String, String]): DataFrame = { + createTable(tableName, source, options) + } /** * :: Experimental :: * (Scala-specific) - * Creates an external table from the given path based on a data source and a set of options. + * Creates a table from the given path based on a data source and a set of options. * Then, returns the corresponding DataFrame. * - * @since 2.0.0 + * @since 2.2.0 */ @Experimental @InterfaceStability.Evolving - def createExternalTable( + def createTable( tableName: String, source: String, options: Map[String, String]): DataFrame /** * :: Experimental :: - * Create an external table from the given path based on a data source, a schema and - * a set of options. Then, returns the corresponding DataFrame. + * Create a table from the given path based on a data source, a schema and a set of options. + * Then, returns the corresponding DataFrame. * * @since 2.0.0 */ + @deprecated("use createTable instead.", "2.2.0") + def createExternalTable( + tableName: String, + source: String, + schema: StructType, + options: java.util.Map[String, String]): DataFrame = { + createTable(tableName, source, schema, options) + } + + /** + * :: Experimental :: + * Create a table from the given path based on a data source, a schema and a set of options. + * Then, returns the corresponding DataFrame. + * + * @since 2.2.0 + */ @Experimental @InterfaceStability.Evolving + def createTable( + tableName: String, + source: String, + schema: StructType, + options: java.util.Map[String, String]): DataFrame = { + createTable(tableName, source, schema, options.asScala.toMap) + } + + /** + * (Scala-specific) + * Create a table from the given path based on a data source, a schema and a set of options. + * Then, returns the corresponding DataFrame. + * + * @since 2.0.0 + */ + @deprecated("use createTable instead.", "2.2.0") def createExternalTable( tableName: String, source: String, schema: StructType, - options: java.util.Map[String, String]): DataFrame + options: Map[String, String]): DataFrame = { + createTable(tableName, source, schema, options) + } /** * :: Experimental :: * (Scala-specific) - * Create an external table from the given path based on a data source, a schema and - * a set of options. Then, returns the corresponding DataFrame. + * Create a table from the given path based on a data source, a schema and a set of options. + * Then, returns the corresponding DataFrame. * - * @since 2.0.0 + * @since 2.2.0 */ @Experimental @InterfaceStability.Evolving - def createExternalTable( + def createTable( tableName: String, source: String, schema: StructType, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 90aeebd932..beeba05554 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -71,15 +71,6 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo options = table.storage.properties ++ pathOption, catalogTable = Some(tableWithDefaultOptions)).resolveRelation() - dataSource match { - case fs: HadoopFsRelation => - if (table.tableType == CatalogTableType.EXTERNAL && fs.location.rootPaths.isEmpty) { - throw new AnalysisException( - "Cannot create a file-based external data source table without path") - } - case _ => - } - val partitionColumnNames = if (table.schema.nonEmpty) { table.partitionColumnNames } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 8244b2152c..9136a83bc2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.internal -import scala.collection.JavaConverters._ import scala.reflect.runtime.universe.TypeTag import org.apache.spark.annotation.Experimental @@ -257,101 +256,74 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { /** * :: Experimental :: - * Creates an external table from the given path and returns the corresponding DataFrame. + * Creates a table from the given path and returns the corresponding DataFrame. * It will use the default data source configured by spark.sql.sources.default. * * @group ddl_ops - * @since 2.0.0 + * @since 2.2.0 */ @Experimental - override def createExternalTable(tableName: String, path: String): DataFrame = { + override def createTable(tableName: String, path: String): DataFrame = { val dataSourceName = sparkSession.sessionState.conf.defaultDataSourceName - createExternalTable(tableName, path, dataSourceName) + createTable(tableName, path, dataSourceName) } /** * :: Experimental :: - * Creates an external table from the given path based on a data source - * and returns the corresponding DataFrame. + * Creates a table from the given path based on a data source and returns the corresponding + * DataFrame. * * @group ddl_ops - * @since 2.0.0 + * @since 2.2.0 */ @Experimental - override def createExternalTable(tableName: String, path: String, source: String): DataFrame = { - createExternalTable(tableName, source, Map("path" -> path)) - } - - /** - * :: Experimental :: - * Creates an external table from the given path based on a data source and a set of options. - * Then, returns the corresponding DataFrame. - * - * @group ddl_ops - * @since 2.0.0 - */ - @Experimental - override def createExternalTable( - tableName: String, - source: String, - options: java.util.Map[String, String]): DataFrame = { - createExternalTable(tableName, source, options.asScala.toMap) + override def createTable(tableName: String, path: String, source: String): DataFrame = { + createTable(tableName, source, Map("path" -> path)) } /** * :: Experimental :: * (Scala-specific) - * Creates an external table from the given path based on a data source and a set of options. + * Creates a table from the given path based on a data source and a set of options. * Then, returns the corresponding DataFrame. * * @group ddl_ops - * @since 2.0.0 + * @since 2.2.0 */ @Experimental - override def createExternalTable( + override def createTable( tableName: String, source: String, options: Map[String, String]): DataFrame = { - createExternalTable(tableName, source, new StructType, options) - } - - /** - * :: Experimental :: - * Create an external table from the given path based on a data source, a schema and - * a set of options. Then, returns the corresponding DataFrame. - * - * @group ddl_ops - * @since 2.0.0 - */ - @Experimental - override def createExternalTable( - tableName: String, - source: String, - schema: StructType, - options: java.util.Map[String, String]): DataFrame = { - createExternalTable(tableName, source, schema, options.asScala.toMap) + createTable(tableName, source, new StructType, options) } /** * :: Experimental :: * (Scala-specific) - * Create an external table from the given path based on a data source, a schema and - * a set of options. Then, returns the corresponding DataFrame. + * Create a table from the given path based on a data source, a schema and a set of options. + * Then, returns the corresponding DataFrame. * * @group ddl_ops - * @since 2.0.0 + * @since 2.2.0 */ @Experimental - override def createExternalTable( + override def createTable( tableName: String, source: String, schema: StructType, options: Map[String, String]): DataFrame = { val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) + val storage = DataSource.buildStorageFormatFromOptions(options) + val tableType = if (storage.locationUri.isDefined) { + CatalogTableType.EXTERNAL + } else { + CatalogTableType.MANAGED + } val tableDesc = CatalogTable( identifier = tableIdent, - tableType = CatalogTableType.EXTERNAL, - storage = DataSource.buildStorageFormatFromOptions(options), + tableType = tableType, + storage = storage, schema = schema, provider = Some(source) ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 5dd04543ed..801912f441 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.internal +import java.io.File +import java.net.URI + import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkFunSuite @@ -27,7 +30,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.types.StructType /** @@ -37,6 +40,7 @@ class CatalogSuite extends SparkFunSuite with BeforeAndAfterEach with SharedSQLContext { + import testImplicits._ private def sessionCatalog: SessionCatalog = spark.sessionState.catalog @@ -306,22 +310,6 @@ class CatalogSuite columnFields.foreach { f => assert(columnString.contains(f.toString)) } } - test("createExternalTable should fail if path is not given for file-based data source") { - val e = intercept[AnalysisException] { - spark.catalog.createExternalTable("tbl", "json", Map.empty[String, String]) - } - assert(e.message.contains("Unable to infer schema")) - - val e2 = intercept[AnalysisException] { - spark.catalog.createExternalTable( - "tbl", - "json", - new StructType().add("i", IntegerType), - Map.empty[String, String]) - } - assert(e2.message == "Cannot create a file-based external data source table without path") - } - test("dropTempView should not un-cache and drop metastore table if a same-name table exists") { withTable("same_name") { spark.range(10).write.saveAsTable("same_name") @@ -460,6 +448,50 @@ class CatalogSuite } } + test("createTable with 'path' in options") { + withTable("t") { + withTempDir { dir => + spark.catalog.createTable( + tableName = "t", + source = "json", + schema = new StructType().add("i", "int"), + options = Map("path" -> dir.getAbsolutePath)) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table.tableType == CatalogTableType.EXTERNAL) + assert(table.storage.locationUri.get == dir.getAbsolutePath) + + Seq((1)).toDF("i").write.insertInto("t") + assert(dir.exists() && dir.listFiles().nonEmpty) + + sql("DROP TABLE t") + // the table path and data files are still there after DROP TABLE, if custom table path is + // specified. + assert(dir.exists() && dir.listFiles().nonEmpty) + } + } + } + + test("createTable without 'path' in options") { + withTable("t") { + spark.catalog.createTable( + tableName = "t", + source = "json", + schema = new StructType().add("i", "int"), + options = Map.empty[String, String]) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table.tableType == CatalogTableType.MANAGED) + val tablePath = new File(new URI(table.storage.locationUri.get)) + assert(tablePath.exists() && tablePath.listFiles().isEmpty) + + Seq((1)).toDF("i").write.insertInto("t") + assert(tablePath.listFiles().nonEmpty) + + sql("DROP TABLE t") + // the table path is removed after DROP TABLE, if custom table path is not specified. + assert(!tablePath.exists()) + } + } + // TODO: add tests for the rest of them } -- GitLab