diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 114ab91d10aa0a1bb4451207ab0e1ee9c346fac7..3ea0f9ed3bddd829989f4180188d10c87f9cf012 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -40,8 +40,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, _} import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser} import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD} -import org.apache.spark.sql.execution.datasources.CreateTableUsingAsSelect -import org.apache.spark.sql.json.JacksonGenerator +import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation} +import org.apache.spark.sql.json.{JacksonGenerator, JSONRelation} +import org.apache.spark.sql.sources.HadoopFsRelation import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils @@ -1546,6 +1547,21 @@ class DataFrame private[sql]( } } + /** + * Returns a best-effort snapshot of the files that compose this DataFrame. This method simply + * asks each constituent BaseRelation for its respective files and takes the union of all results. + * Depending on the source relations, this may not find all input files. Duplicates are removed. + */ + def inputFiles: Array[String] = { + val files: Seq[String] = logicalPlan.collect { + case LogicalRelation(fsBasedRelation: HadoopFsRelation) => + fsBasedRelation.paths.toSeq + case LogicalRelation(jsonRelation: JSONRelation) => + jsonRelation.path.toSeq + }.flatten + files.toSet.toArray + } + //////////////////////////////////////////////////////////////////////////// // for Python API //////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index f67f2c60c0e164df1ec2b3d8d3c7095a746ec36c..3151e071b19eabd913fd36c8265446fd6015f4fc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -23,7 +23,10 @@ import scala.language.postfixOps import scala.util.Random import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.functions._ +import org.apache.spark.sql.json.JSONRelation +import org.apache.spark.sql.parquet.ParquetRelation import org.apache.spark.sql.types._ import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint, SQLTestUtils} @@ -491,6 +494,23 @@ class DataFrameSuite extends QueryTest with SQLTestUtils { checkAnswer(df.select(df("key")), testData.select('key).collect().toSeq) } + test("inputFiles") { + val fakeRelation1 = new ParquetRelation(Array("/my/path", "/my/other/path"), + Some(testData.schema), None, Map.empty)(sqlContext) + val df1 = DataFrame(sqlContext, LogicalRelation(fakeRelation1)) + assert(df1.inputFiles.toSet == fakeRelation1.paths.toSet) + + val fakeRelation2 = new JSONRelation("/json/path", 1, Some(testData.schema), sqlContext) + val df2 = DataFrame(sqlContext, LogicalRelation(fakeRelation2)) + assert(df2.inputFiles.toSet == fakeRelation2.path.toSet) + + val unionDF = df1.unionAll(df2) + assert(unionDF.inputFiles.toSet == fakeRelation1.paths.toSet ++ fakeRelation2.path) + + val filtered = df1.filter("false").unionAll(df2.intersect(df2)) + assert(filtered.inputFiles.toSet == fakeRelation1.paths.toSet ++ fakeRelation2.path) + } + ignore("show") { // This test case is intended ignored, but to make sure it compiles correctly testData.select($"*").show() diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 3180c05445c9f3ef7aa516f359a24fb06ca0f4c5..a8c9b4fa71b9914b4554908bb9e1778eb928a248 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -274,9 +274,9 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging - // NOTE: Instead of passing Metastore schema directly to `ParquetRelation2`, we have to + // NOTE: Instead of passing Metastore schema directly to `ParquetRelation`, we have to // serialize the Metastore schema to JSON and pass it as a data source option because of the - // evil case insensitivity issue, which is reconciled within `ParquetRelation2`. + // evil case insensitivity issue, which is reconciled within `ParquetRelation`. val parquetOptions = Map( ParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json, ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString) @@ -290,7 +290,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => None // Cache miss - case logical@LogicalRelation(parquetRelation: ParquetRelation) => + case logical @ LogicalRelation(parquetRelation: ParquetRelation) => // If we have the same paths, same schema, and same partition spec, // we will use the cached Parquet Relation. val useCached =