Skip to content
  • Michael Armbrust's avatar
    e720dda4
    [SPARK-13665][SQL] Separate the concerns of HadoopFsRelation · e720dda4
    Michael Armbrust authored
    `HadoopFsRelation` is used for reading most files into Spark SQL.  However today this class mixes the concerns of file management, schema reconciliation, scan building, bucketing, partitioning, and writing data.  As a result, many data sources are forced to reimplement the same functionality and the various layers have accumulated a fair bit of inefficiency.  This PR is a first cut at separating this into several components / interfaces that are each described below.  Additionally, all implementations inside of Spark (parquet, csv, json, text, orc, svmlib) have been ported to the new API `FileFormat`.  External libraries, such as spark-avro will also need to be ported to work with Spark 2.0.
    
    ### HadoopFsRelation
    A simple `case class` that acts as a container for all of the metadata required to read from a datasource.  All discovery, resolution and merging logic for schemas and partitions has been removed.  This an internal representation that no longer needs to be exposed to developers.
    
    ```scala
    case class HadoopFsRelation(
        sqlContext: SQLContext,
        location: FileCatalog,
        partitionSchema: StructType,
        dataSchema: StructType,
        bucketSpec: Option[BucketSpec],
        fileFormat: FileFormat,
        options: Map[String, String]) extends BaseRelation
    ```
    
    ### FileFormat
    The primary interface that will be implemented by each different format including external libraries.  Implementors are responsible for reading a given format and converting it into `InternalRow` as well as writing out an `InternalRow`.  A format can optionally return a schema that is inferred from a set of files.
    
    ```scala
    trait FileFormat {
      def inferSchema(
          sqlContext: SQLContext,
          options: Map[String, String],
          files: Seq[FileStatus]): Option[StructType]
    
      def prepareWrite(
          sqlContext: SQLContext,
          job: Job,
          options: Map[String, String],
          dataSchema: StructType): OutputWriterFactory
    
      def buildInternalScan(
          sqlContext: SQLContext,
          dataSchema: StructType,
          requiredColumns: Array[String],
          filters: Array[Filter],
          bucketSet: Option[BitSet],
          inputFiles: Array[FileStatus],
          broadcastedConf: Broadcast[SerializableConfiguration],
          options: Map[String, String]): RDD[InternalRow]
    }
    ```
    
    The current interface is based on what was required to get all the tests passing again, but still mixes a couple of concerns (i.e. `bucketSet` is passed down to the scan instead of being resolved by the planner).  Additionally, scans are still returning `RDD`s instead of iterators for single files.  In a future PR, bucketing should be removed from this interface and the scan should be isolated to a single file.
    
    ### FileCatalog
    This interface is used to list the files that make up a given relation, as well as handle directory based partitioning.
    
    ```scala
    trait FileCatalog {
      def paths: Seq[Path]
      def partitionSpec(schema: Option[StructType]): PartitionSpec
      def allFiles(): Seq[FileStatus]
      def getStatus(path: Path): Array[FileStatus]
      def refresh(): Unit
    }
    ```
    
    Currently there are two implementations:
     - `HDFSFileCatalog` - based on code from the old `HadoopFsRelation`.  Infers partitioning by recursive listing and caches this data for performance
     - `HiveFileCatalog` - based on the above, but it uses the partition spec from the Hive Metastore.
    
    ### ResolvedDataSource
    Produces a logical plan given the following description of a Data Source (which can come from DataFrameReader or a metastore):
     - `paths: Seq[String] = Nil`
     - `userSpecifiedSchema: Option[StructType] = None`
     - `partitionColumns: Array[String] = Array.empty`
     - `bucketSpec: Option[BucketSpec] = None`
     - `provider: String`
     - `options: Map[String, String]`
    
    This class is responsible for deciding which of the Data Source APIs a given provider is using (including the non-file based ones).  All reconciliation of partitions, buckets, schema from metastores or inference is done here.
    
    ### DataSourceAnalysis / DataSourceStrategy
    Responsible for analyzing and planning reading/writing of data using any of the Data Source APIs, including:
     - pruning the files from partitions that will be read based on filters.
     - appending partition columns*
     - applying additional filters when a data source can not evaluate them internally.
     - constructing an RDD that is bucketed correctly when required*
     - sanity checking schema match-up and other analysis when writing.
    
    *In the future we should do that following:
     - Break out file handling into its own Strategy as its sufficiently complex / isolated.
     - Push the appending of partition columns down in to `FileFormat` to avoid an extra copy / unvectorization.
     - Use a custom RDD for scans instead of `SQLNewNewHadoopRDD2`
    
    Author: Michael Armbrust <michael@databricks.com>
    Author: Wenchen Fan <wenchen@databricks.com>
    
    Closes #11509 from marmbrus/fileDataSource.
    e720dda4
    [SPARK-13665][SQL] Separate the concerns of HadoopFsRelation
    Michael Armbrust authored
    `HadoopFsRelation` is used for reading most files into Spark SQL.  However today this class mixes the concerns of file management, schema reconciliation, scan building, bucketing, partitioning, and writing data.  As a result, many data sources are forced to reimplement the same functionality and the various layers have accumulated a fair bit of inefficiency.  This PR is a first cut at separating this into several components / interfaces that are each described below.  Additionally, all implementations inside of Spark (parquet, csv, json, text, orc, svmlib) have been ported to the new API `FileFormat`.  External libraries, such as spark-avro will also need to be ported to work with Spark 2.0.
    
    ### HadoopFsRelation
    A simple `case class` that acts as a container for all of the metadata required to read from a datasource.  All discovery, resolution and merging logic for schemas and partitions has been removed.  This an internal representation that no longer needs to be exposed to developers.
    
    ```scala
    case class HadoopFsRelation(
        sqlContext: SQLContext,
        location: FileCatalog,
        partitionSchema: StructType,
        dataSchema: StructType,
        bucketSpec: Option[BucketSpec],
        fileFormat: FileFormat,
        options: Map[String, String]) extends BaseRelation
    ```
    
    ### FileFormat
    The primary interface that will be implemented by each different format including external libraries.  Implementors are responsible for reading a given format and converting it into `InternalRow` as well as writing out an `InternalRow`.  A format can optionally return a schema that is inferred from a set of files.
    
    ```scala
    trait FileFormat {
      def inferSchema(
          sqlContext: SQLContext,
          options: Map[String, String],
          files: Seq[FileStatus]): Option[StructType]
    
      def prepareWrite(
          sqlContext: SQLContext,
          job: Job,
          options: Map[String, String],
          dataSchema: StructType): OutputWriterFactory
    
      def buildInternalScan(
          sqlContext: SQLContext,
          dataSchema: StructType,
          requiredColumns: Array[String],
          filters: Array[Filter],
          bucketSet: Option[BitSet],
          inputFiles: Array[FileStatus],
          broadcastedConf: Broadcast[SerializableConfiguration],
          options: Map[String, String]): RDD[InternalRow]
    }
    ```
    
    The current interface is based on what was required to get all the tests passing again, but still mixes a couple of concerns (i.e. `bucketSet` is passed down to the scan instead of being resolved by the planner).  Additionally, scans are still returning `RDD`s instead of iterators for single files.  In a future PR, bucketing should be removed from this interface and the scan should be isolated to a single file.
    
    ### FileCatalog
    This interface is used to list the files that make up a given relation, as well as handle directory based partitioning.
    
    ```scala
    trait FileCatalog {
      def paths: Seq[Path]
      def partitionSpec(schema: Option[StructType]): PartitionSpec
      def allFiles(): Seq[FileStatus]
      def getStatus(path: Path): Array[FileStatus]
      def refresh(): Unit
    }
    ```
    
    Currently there are two implementations:
     - `HDFSFileCatalog` - based on code from the old `HadoopFsRelation`.  Infers partitioning by recursive listing and caches this data for performance
     - `HiveFileCatalog` - based on the above, but it uses the partition spec from the Hive Metastore.
    
    ### ResolvedDataSource
    Produces a logical plan given the following description of a Data Source (which can come from DataFrameReader or a metastore):
     - `paths: Seq[String] = Nil`
     - `userSpecifiedSchema: Option[StructType] = None`
     - `partitionColumns: Array[String] = Array.empty`
     - `bucketSpec: Option[BucketSpec] = None`
     - `provider: String`
     - `options: Map[String, String]`
    
    This class is responsible for deciding which of the Data Source APIs a given provider is using (including the non-file based ones).  All reconciliation of partitions, buckets, schema from metastores or inference is done here.
    
    ### DataSourceAnalysis / DataSourceStrategy
    Responsible for analyzing and planning reading/writing of data using any of the Data Source APIs, including:
     - pruning the files from partitions that will be read based on filters.
     - appending partition columns*
     - applying additional filters when a data source can not evaluate them internally.
     - constructing an RDD that is bucketed correctly when required*
     - sanity checking schema match-up and other analysis when writing.
    
    *In the future we should do that following:
     - Break out file handling into its own Strategy as its sufficiently complex / isolated.
     - Push the appending of partition columns down in to `FileFormat` to avoid an extra copy / unvectorization.
     - Use a custom RDD for scans instead of `SQLNewNewHadoopRDD2`
    
    Author: Michael Armbrust <michael@databricks.com>
    Author: Wenchen Fan <wenchen@databricks.com>
    
    Closes #11509 from marmbrus/fileDataSource.
Loading