diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 79231ee9e378d6fb6c314039d3ba552dce6aa099..e74fa6e638a0b0137f314be6c9f8d4b4366954da 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -156,9 +156,9 @@ case class CatalogTable( outputFormat: Option[String] = storage.outputFormat, compressed: Boolean = false, serde: Option[String] = storage.serde, - serdeProperties: Map[String, String] = storage.properties): CatalogTable = { + properties: Map[String, String] = storage.properties): CatalogTable = { copy(storage = CatalogStorageFormat( - locationUri, inputFormat, outputFormat, serde, compressed, serdeProperties)) + locationUri, inputFormat, outputFormat, serde, compressed, properties)) } override def toString: String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index c7e327906174ce705b62b6028f4a8794165d8196..b1830e6cf3ea8f350cf96b5927994a8f9091beb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.QueryPlan @@ -56,12 +55,6 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo } } - val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) { - table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier)) - } else { - table.storage.properties - } - // Create the relation to validate the arguments before writing the metadata to the metastore, // and infer the table schema and partition if users didn't specify schema in CREATE TABLE. val dataSource: BaseRelation = @@ -70,7 +63,16 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema), className = table.provider.get, bucketSpec = table.bucketSpec, - options = optionsWithPath).resolveRelation(checkPathExist = false) + options = table.storage.properties).resolveRelation() + + dataSource match { + case fs: HadoopFsRelation => + if (table.tableType == CatalogTableType.EXTERNAL && fs.location.paths.isEmpty) { + throw new AnalysisException( + "Cannot create a file-based external data source table without path") + } + case _ => + } val partitionColumnNames = if (table.schema.nonEmpty) { table.partitionColumnNames @@ -83,6 +85,12 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo } } + val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) { + table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier)) + } else { + table.storage.properties + } + val newTable = table.copy( storage = table.storage.copy(properties = optionsWithPath), schema = dataSource.schema, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 53fb684eb5ce3fbc09ac0190b7c231a1e3333cd8..bc1c4f85e331501e2e2a51126fe980c1c23ae897 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -318,7 +318,7 @@ case class AlterTableSerDePropertiesCommand( if (partSpec.isEmpty) { val newTable = table.withNewStorage( serde = serdeClassName.orElse(table.storage.serde), - serdeProperties = table.storage.properties ++ serdeProperties.getOrElse(Map())) + properties = table.storage.properties ++ serdeProperties.getOrElse(Map())) catalog.alterTable(newTable) } else { val spec = partSpec.get @@ -669,7 +669,7 @@ case class AlterTableSetLocationCommand( if (DDLUtils.isDatasourceTable(table)) { table.withNewStorage( locationUri = Some(location), - serdeProperties = table.storage.properties ++ Map("path" -> location)) + properties = table.storage.properties ++ Map("path" -> location)) } else { table.withNewStorage(locationUri = Some(location)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 4e6caae85caebb5c75f5fe69329ee3f820e458b6..027f3588e292289c739b4b1049f6e72887f1bf6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -181,7 +181,7 @@ case class AlterTableRenameCommand( if (DDLUtils.isDatasourceTable(table) && table.tableType == CatalogTableType.MANAGED) { val newPath = catalog.defaultTablePath(newTblName) val newTable = table.withNewStorage( - serdeProperties = table.storage.properties ++ Map("path" -> newPath)) + properties = table.storage.properties ++ Map("path" -> newPath)) catalog.alterTable(newTable) } // Invalidate the table last, otherwise uncaching the table would load the logical plan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 9c99a800cc050b7bdd9f2a8880288e7bb293e0e1..71807b771a95f6309974c3e6a45a37a6ec0aa8ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -315,12 +315,8 @@ case class DataSource( /** * Create a resolved [[BaseRelation]] that can be used to read data from or write data into this * [[DataSource]] - * - * @param checkPathExist A flag to indicate whether to check the existence of path or not. - * This flag will be set to false when we create an empty table (the - * path of the table does not exist). */ - def resolveRelation(checkPathExist: Boolean = true): BaseRelation = { + def resolveRelation(): BaseRelation = { val caseInsensitiveOptions = new CaseInsensitiveMap(options) val relation = (providingClass.newInstance(), userSpecifiedSchema) match { // TODO: Throw when too much is given. @@ -367,11 +363,11 @@ case class DataSource( val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified) - if (checkPathExist && globPath.isEmpty) { + if (globPath.isEmpty) { throw new AnalysisException(s"Path does not exist: $qualified") } // Sufficient to check head of the globPath seq for non-glob scenario - if (checkPathExist && !fs.exists(globPath.head)) { + if (!fs.exists(globPath.head)) { throw new AnalysisException(s"Path does not exist: ${globPath.head}") } globPath @@ -391,7 +387,7 @@ case class DataSource( val fileCatalog = new ListingFileCatalog( - sparkSession, globbedPaths, options, partitionSchema, !checkPathExist) + sparkSession, globbedPaths, options, partitionSchema) val dataSchema = userSpecifiedSchema.map { schema => val equality = sparkSession.sessionState.conf.resolver diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala index 706ec6b9b36c736e8f77ca5fee1d40cf3d949870..60742bdbed204e1c95fd03bc2eb0162291cb44d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala @@ -17,10 +17,7 @@ package org.apache.spark.sql.execution.datasources -import java.io.FileNotFoundException - import scala.collection.mutable -import scala.util.Try import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path} import org.apache.hadoop.mapred.{FileInputFormat, JobConf} @@ -37,16 +34,12 @@ import org.apache.spark.sql.types.StructType * @param paths a list of paths to scan * @param partitionSchema an optional partition schema that will be use to provide types for the * discovered partitions - * @param ignoreFileNotFound if true, return empty file list when encountering a - * [[FileNotFoundException]] in file listing. Note that this is a hack - * for SPARK-16313. We should get rid of this flag in the future. */ class ListingFileCatalog( sparkSession: SparkSession, override val paths: Seq[Path], parameters: Map[String, String], - partitionSchema: Option[StructType], - ignoreFileNotFound: Boolean = false) + partitionSchema: Option[StructType]) extends PartitioningAwareFileCatalog(sparkSession, parameters, partitionSchema) { @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _ @@ -88,7 +81,7 @@ class ListingFileCatalog( */ def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { - HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession, ignoreFileNotFound) + HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession) } else { // Right now, the number of paths is less than the value of // parallelPartitionDiscoveryThreshold. So, we will list file statues at the driver. @@ -104,12 +97,7 @@ class ListingFileCatalog( logTrace(s"Listing $path on driver") val childStatuses = { - val stats = - try { - fs.listStatus(path) - } catch { - case e: FileNotFoundException if ignoreFileNotFound => Array.empty[FileStatus] - } + val stats = fs.listStatus(path) if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index 7e40c359840624d66964f09e1f29d37545c5ea51..5cc5f32e6e8093df4bf92dd187955944983bd102 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -440,8 +440,7 @@ object HadoopFsRelation extends Logging { def listLeafFilesInParallel( paths: Seq[Path], hadoopConf: Configuration, - sparkSession: SparkSession, - ignoreFileNotFound: Boolean): mutable.LinkedHashSet[FileStatus] = { + sparkSession: SparkSession): mutable.LinkedHashSet[FileStatus] = { assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}") @@ -462,11 +461,7 @@ object HadoopFsRelation extends Logging { val pathFilter = FileInputFormat.getInputPathFilter(jobConf) paths.map(new Path(_)).flatMap { path => val fs = path.getFileSystem(serializableConfiguration.value) - try { - listLeafFiles(fs, fs.getFileStatus(path), pathFilter) - } catch { - case e: java.io.FileNotFoundException if ignoreFileNotFound => Array.empty[FileStatus] - } + listLeafFiles(fs, fs.getFileStatus(path), pathFilter) } }.map { status => val blockLocations = status match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 8aa81854b22e8f2818bc8d42687e6675f39279d6..b221eed7b2426c465e8cf8314f25efd38f8bc3e5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{IntegerType, StructType} /** @@ -305,6 +306,22 @@ class CatalogSuite columnFields.foreach { f => assert(columnString.contains(f.toString)) } } + test("createExternalTable should fail if path is not given for file-based data source") { + val e = intercept[AnalysisException] { + spark.catalog.createExternalTable("tbl", "json", Map.empty[String, String]) + } + assert(e.message.contains("Unable to infer schema")) + + val e2 = intercept[AnalysisException] { + spark.catalog.createExternalTable( + "tbl", + "json", + new StructType().add("i", IntegerType), + Map.empty[String, String]) + } + assert(e2.message == "Cannot create a file-based external data source table without path") + } + // TODO: add tests for the rest of them } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c48d4ed6088b5f719a2cbe593fd534211b994de7..8410a2e4a47ca515315c3bcc3dc5099c4b71f12a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -81,7 +81,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log options = table.storage.properties) LogicalRelation( - dataSource.resolveRelation(checkPathExist = true), + dataSource.resolveRelation(), catalogTable = Some(table)) } }