From 35ef853b3f9d955949c464e4a0d445147e0e9a07 Mon Sep 17 00:00:00 2001
From: Aaron Davidson <aaron@databricks.com>
Date: Tue, 28 Jul 2015 10:12:09 -0700
Subject: [PATCH] [SPARK-9397] DataFrame should provide an API to find source
 data files if applicable

Certain applications would benefit from being able to inspect DataFrames that are straightforwardly produced by data sources that stem from files, and find out their source data. For example, one might want to display to a user the size of the data underlying a table, or to copy or mutate it.

This PR exposes an `inputFiles` method on DataFrame which attempts to discover the source data in a best-effort manner, by inspecting HadoopFsRelations and JSONRelations.

Author: Aaron Davidson <aaron@databricks.com>

Closes #7717 from aarondav/paths and squashes the following commits:

ff67430 [Aaron Davidson] inputFiles
0acd3ad [Aaron Davidson] [SPARK-9397] DataFrame should provide an API to find source data files if applicable
---
 .../org/apache/spark/sql/DataFrame.scala      | 20 +++++++++++++++++--
 .../org/apache/spark/sql/DataFrameSuite.scala | 20 +++++++++++++++++++
 .../spark/sql/hive/HiveMetastoreCatalog.scala |  6 +++---
 3 files changed, 41 insertions(+), 5 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 114ab91d10..3ea0f9ed3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -40,8 +40,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, _}
 import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser}
 import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD}
-import org.apache.spark.sql.execution.datasources.CreateTableUsingAsSelect
-import org.apache.spark.sql.json.JacksonGenerator
+import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
+import org.apache.spark.sql.json.{JacksonGenerator, JSONRelation}
+import org.apache.spark.sql.sources.HadoopFsRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.Utils
@@ -1546,6 +1547,21 @@ class DataFrame private[sql](
     }
   }
 
+  /**
+   * Returns a best-effort snapshot of the files that compose this DataFrame. This method simply
+   * asks each constituent BaseRelation for its respective files and takes the union of all results.
+   * Depending on the source relations, this may not find all input files. Duplicates are removed.
+   */
+  def inputFiles: Array[String] = {
+    val files: Seq[String] = logicalPlan.collect {
+      case LogicalRelation(fsBasedRelation: HadoopFsRelation) =>
+        fsBasedRelation.paths.toSeq
+      case LogicalRelation(jsonRelation: JSONRelation) =>
+        jsonRelation.path.toSeq
+    }.flatten
+    files.toSet.toArray
+  }
+
   ////////////////////////////////////////////////////////////////////////////
   // for Python API
   ////////////////////////////////////////////////////////////////////////////
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index f67f2c60c0..3151e071b1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -23,7 +23,10 @@ import scala.language.postfixOps
 import scala.util.Random
 
 import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.functions._
+import org.apache.spark.sql.json.JSONRelation
+import org.apache.spark.sql.parquet.ParquetRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint, SQLTestUtils}
 
@@ -491,6 +494,23 @@ class DataFrameSuite extends QueryTest with SQLTestUtils {
     checkAnswer(df.select(df("key")), testData.select('key).collect().toSeq)
   }
 
+  test("inputFiles") {
+    val fakeRelation1 = new ParquetRelation(Array("/my/path", "/my/other/path"),
+      Some(testData.schema), None, Map.empty)(sqlContext)
+    val df1 = DataFrame(sqlContext, LogicalRelation(fakeRelation1))
+    assert(df1.inputFiles.toSet == fakeRelation1.paths.toSet)
+
+    val fakeRelation2 = new JSONRelation("/json/path", 1, Some(testData.schema), sqlContext)
+    val df2 = DataFrame(sqlContext, LogicalRelation(fakeRelation2))
+    assert(df2.inputFiles.toSet == fakeRelation2.path.toSet)
+
+    val unionDF = df1.unionAll(df2)
+    assert(unionDF.inputFiles.toSet == fakeRelation1.paths.toSet ++ fakeRelation2.path)
+
+    val filtered = df1.filter("false").unionAll(df2.intersect(df2))
+    assert(filtered.inputFiles.toSet == fakeRelation1.paths.toSet ++ fakeRelation2.path)
+  }
+
   ignore("show") {
     // This test case is intended ignored, but to make sure it compiles correctly
     testData.select($"*").show()
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 3180c05445..a8c9b4fa71 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -274,9 +274,9 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
     val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
     val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
 
-    // NOTE: Instead of passing Metastore schema directly to `ParquetRelation2`, we have to
+    // NOTE: Instead of passing Metastore schema directly to `ParquetRelation`, we have to
     // serialize the Metastore schema to JSON and pass it as a data source option because of the
-    // evil case insensitivity issue, which is reconciled within `ParquetRelation2`.
+    // evil case insensitivity issue, which is reconciled within `ParquetRelation`.
     val parquetOptions = Map(
       ParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json,
       ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString)
@@ -290,7 +290,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
         partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
       cachedDataSourceTables.getIfPresent(tableIdentifier) match {
         case null => None // Cache miss
-        case logical@LogicalRelation(parquetRelation: ParquetRelation) =>
+        case logical @ LogicalRelation(parquetRelation: ParquetRelation) =>
           // If we have the same paths, same schema, and same partition spec,
           // we will use the cached Parquet Relation.
           val useCached =
-- 
GitLab