From 64e826f91eabb1a22d3d163d71fbb7b6d2185f25 Mon Sep 17 00:00:00 2001
From: Yadong Qi <qiyadong2010@gmail.com>
Date: Tue, 6 Sep 2016 10:57:21 +0800
Subject: [PATCH] [SPARK-17358][SQL] Cached table(parquet/orc) should be shard
 between beelines

## What changes were proposed in this pull request?
Cached table(parquet/orc) couldn't be shard between beelines, because the `sameResult` method used by `CacheManager` always return false(`sparkSession` are different) when compare two `HadoopFsRelation` in different beelines. So we make `sparkSession` a curry parameter.

## How was this patch tested?
Beeline1
```
1: jdbc:hive2://localhost:10000> CACHE TABLE src_pqt;
+---------+--+
| Result  |
+---------+--+
+---------+--+
No rows selected (5.143 seconds)
1: jdbc:hive2://localhost:10000> EXPLAIN SELECT * FROM src_pqt;
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+
|                                                                                                                                                                                                            plan                                                                                                                                                                                                            |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+
| == Physical Plan ==
InMemoryTableScan [key#49, value#50]
   +- InMemoryRelation [key#49, value#50], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `src_pqt`
         +- *FileScan parquet default.src_pqt[key#0,value#1] Batched: true, Format: ParquetFormat, InputPaths: hdfs://199.0.0.1:9000/qiyadong/src_pqt, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:int,value:string>  |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+
```

Beeline2
```
0: jdbc:hive2://localhost:10000> EXPLAIN SELECT * FROM src_pqt;
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+
|                                                                                                                                                                                                            plan                                                                                                                                                                                                            |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+
| == Physical Plan ==
InMemoryTableScan [key#68, value#69]
   +- InMemoryRelation [key#68, value#69], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `src_pqt`
         +- *FileScan parquet default.src_pqt[key#0,value#1] Batched: true, Format: ParquetFormat, InputPaths: hdfs://199.0.0.1:9000/qiyadong/src_pqt, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:int,value:string>  |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+
```

Author: Yadong Qi <qiyadong2010@gmail.com>

Closes #14913 from watermen/SPARK-17358.
---
 .../apache/spark/sql/execution/datasources/DataSource.scala | 6 ++----
 .../sql/execution/datasources/fileSourceInterfaces.scala    | 4 ++--
 .../sql/execution/datasources/FileSourceStrategySuite.scala | 3 ++-
 .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala    | 3 +--
 4 files changed, 7 insertions(+), 9 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 5968db84cd..9c99a800cc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -351,13 +351,12 @@ case class DataSource(
         }
 
         HadoopFsRelation(
-          sparkSession,
           fileCatalog,
           partitionSchema = fileCatalog.partitionSpec().partitionColumns,
           dataSchema = dataSchema,
           bucketSpec = None,
           format,
-          options)
+          options)(sparkSession)
 
       // This is a non-streaming file based datasource.
       case (format: FileFormat, _) =>
@@ -409,13 +408,12 @@ case class DataSource(
         }
 
         HadoopFsRelation(
-          sparkSession,
           fileCatalog,
           partitionSchema = fileCatalog.partitionSpec().partitionColumns,
           dataSchema = dataSchema.asNullable,
           bucketSpec = bucketSpec,
           format,
-          caseInsensitiveOptions)
+          caseInsensitiveOptions)(sparkSession)
 
       case _ =>
         throw new AnalysisException(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index e03a2323c7..7e40c35984 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -134,13 +134,13 @@ abstract class OutputWriter {
  * @param options Configuration used when reading / writing data.
  */
 case class HadoopFsRelation(
-    sparkSession: SparkSession,
     location: FileCatalog,
     partitionSchema: StructType,
     dataSchema: StructType,
     bucketSpec: Option[BucketSpec],
     fileFormat: FileFormat,
-    options: Map[String, String]) extends BaseRelation with FileRelation {
+    options: Map[String, String])(val sparkSession: SparkSession)
+  extends BaseRelation with FileRelation {
 
   override def sqlContext: SQLContext = sparkSession.sqlContext
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 09fd750180..45411fa065 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -508,7 +508,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
       val bucketed = df.queryExecution.analyzed transform {
         case l @ LogicalRelation(r: HadoopFsRelation, _, _) =>
           l.copy(relation =
-            r.copy(bucketSpec = Some(BucketSpec(numBuckets = buckets, "c1" :: Nil, Nil))))
+            r.copy(bucketSpec =
+              Some(BucketSpec(numBuckets = buckets, "c1" :: Nil, Nil)))(r.sparkSession))
       }
       Dataset.ofRows(spark, bucketed)
     } else {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d31a8d643a..c48d4ed608 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -249,13 +249,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
         }
 
         val relation = HadoopFsRelation(
-          sparkSession = sparkSession,
           location = fileCatalog,
           partitionSchema = partitionSchema,
           dataSchema = inferredSchema,
           bucketSpec = bucketSpec,
           fileFormat = defaultSource,
-          options = options)
+          options = options)(sparkSession = sparkSession)
 
         val created = LogicalRelation(relation, catalogTable = Some(metastoreRelation.catalogTable))
         cachedDataSourceTables.put(tableIdentifier, created)
-- 
GitLab