diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 5968db84cd608ccdd80f2bb63a4d034f8fd4f322..9c99a800cc050b7bdd9f2a8880288e7bb293e0e1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -351,13 +351,12 @@ case class DataSource(
         }
 
         HadoopFsRelation(
-          sparkSession,
           fileCatalog,
           partitionSchema = fileCatalog.partitionSpec().partitionColumns,
           dataSchema = dataSchema,
           bucketSpec = None,
           format,
-          options)
+          options)(sparkSession)
 
       // This is a non-streaming file based datasource.
       case (format: FileFormat, _) =>
@@ -409,13 +408,12 @@ case class DataSource(
         }
 
         HadoopFsRelation(
-          sparkSession,
           fileCatalog,
           partitionSchema = fileCatalog.partitionSpec().partitionColumns,
           dataSchema = dataSchema.asNullable,
           bucketSpec = bucketSpec,
           format,
-          caseInsensitiveOptions)
+          caseInsensitiveOptions)(sparkSession)
 
       case _ =>
         throw new AnalysisException(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index e03a2323c7493a8177a3f8a5be0bbd7c0bde2dd0..7e40c359840624d66964f09e1f29d37545c5ea51 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -134,13 +134,13 @@ abstract class OutputWriter {
  * @param options Configuration used when reading / writing data.
  */
 case class HadoopFsRelation(
-    sparkSession: SparkSession,
     location: FileCatalog,
     partitionSchema: StructType,
     dataSchema: StructType,
     bucketSpec: Option[BucketSpec],
     fileFormat: FileFormat,
-    options: Map[String, String]) extends BaseRelation with FileRelation {
+    options: Map[String, String])(val sparkSession: SparkSession)
+  extends BaseRelation with FileRelation {
 
   override def sqlContext: SQLContext = sparkSession.sqlContext
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 09fd7501803572890636843f30c77ed60e8c207e..45411fa0656cdde4be76ca478c9477ba0272e557 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -508,7 +508,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
       val bucketed = df.queryExecution.analyzed transform {
         case l @ LogicalRelation(r: HadoopFsRelation, _, _) =>
           l.copy(relation =
-            r.copy(bucketSpec = Some(BucketSpec(numBuckets = buckets, "c1" :: Nil, Nil))))
+            r.copy(bucketSpec =
+              Some(BucketSpec(numBuckets = buckets, "c1" :: Nil, Nil)))(r.sparkSession))
       }
       Dataset.ofRows(spark, bucketed)
     } else {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d31a8d643ad88c5db7e4a5a340ac7bd4c96da8d3..c48d4ed6088b5f719a2cbe593fd534211b994de7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -249,13 +249,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
         }
 
         val relation = HadoopFsRelation(
-          sparkSession = sparkSession,
           location = fileCatalog,
           partitionSchema = partitionSchema,
           dataSchema = inferredSchema,
           bucketSpec = bucketSpec,
           fileFormat = defaultSource,
-          options = options)
+          options = options)(sparkSession = sparkSession)
 
         val created = LogicalRelation(relation, catalogTable = Some(metastoreRelation.catalogTable))
         cachedDataSourceTables.put(tableIdentifier, created)