diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 05fc569588658c733fef285540967ac419ac3455..4238ad1ad41faee4165582e2589893adcef1bd43 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -210,6 +210,7 @@ class HiveContext private[hive](
       version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion),
       sparkConf = sc.conf,
       execJars = Seq(),
+      hadoopConf = sc.hadoopConfiguration,
       config = newTemporaryConfiguration(useInMemoryDerby = true),
       isolationOn = false,
       baseClassLoader = Utils.getContextOrSparkClassLoader)
@@ -239,7 +240,7 @@ class HiveContext private[hive](
 
     // We instantiate a HiveConf here to read in the hive-site.xml file and then pass the options
     // into the isolated client loader
-    val metadataConf = new HiveConf()
+    val metadataConf = new HiveConf(sc.hadoopConfiguration, classOf[HiveConf])
 
     val defaultWarehouseLocation = metadataConf.get("hive.metastore.warehouse.dir")
     logInfo("default warehouse location is " + defaultWarehouseLocation)
@@ -279,6 +280,7 @@ class HiveContext private[hive](
         version = metaVersion,
         sparkConf = sc.conf,
         execJars = jars.toSeq,
+        hadoopConf = sc.hadoopConfiguration,
         config = allConfig,
         isolationOn = true,
         barrierPrefixes = hiveMetastoreBarrierPrefixes,
@@ -291,6 +293,7 @@ class HiveContext private[hive](
         hiveMetastoreVersion = hiveMetastoreVersion,
         hadoopVersion = VersionInfo.getVersion,
         sparkConf = sc.conf,
+        hadoopConf = sc.hadoopConfiguration,
         config = allConfig,
         barrierPrefixes = hiveMetastoreBarrierPrefixes,
         sharedPrefixes = hiveMetastoreSharedPrefixes)
@@ -320,6 +323,7 @@ class HiveContext private[hive](
         version = metaVersion,
         sparkConf = sc.conf,
         execJars = jars.toSeq,
+        hadoopConf = sc.hadoopConfiguration,
         config = allConfig,
         isolationOn = true,
         barrierPrefixes = hiveMetastoreBarrierPrefixes,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 3040ec93f8d8b12a44747488415b3743ae902b75..a5f0bbf678d5f31075da46706986cf5eef7ff09f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -22,6 +22,7 @@ import java.io.{File, PrintStream}
 import scala.collection.JavaConverters._
 import scala.language.reflectiveCalls
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.cli.CliSessionState
 import org.apache.hadoop.hive.conf.HiveConf
@@ -62,6 +63,7 @@ import org.apache.spark.util.{CircularBuffer, Utils}
 private[hive] class HiveClientImpl(
     override val version: HiveVersion,
     sparkConf: SparkConf,
+    hadoopConf: Configuration,
     config: Map[String, String],
     initClassLoader: ClassLoader,
     val clientLoader: IsolatedClientLoader)
@@ -115,7 +117,7 @@ private[hive] class HiveClientImpl(
         // so we should keep `conf` and reuse the existing instance of `CliSessionState`.
         originalState
       } else {
-        val initialConf = new HiveConf(classOf[SessionState])
+        val initialConf = new HiveConf(hadoopConf, classOf[SessionState])
         // HiveConf is a Hadoop Configuration, which has a field of classLoader and
         // the initial value will be the current thread's context class loader
         // (i.e. initClassLoader at here).
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 024f4dfeba9d8a884194a139d1ed0a221dcac682..932402a5f32f2257f1849b0909c627651e7bc688 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -26,6 +26,7 @@ import scala.language.reflectiveCalls
 import scala.util.Try
 
 import org.apache.commons.io.{FileUtils, IOUtils}
+import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.deploy.SparkSubmitUtils
@@ -42,6 +43,7 @@ private[hive] object IsolatedClientLoader extends Logging {
       hiveMetastoreVersion: String,
       hadoopVersion: String,
       sparkConf: SparkConf,
+      hadoopConf: Configuration,
       config: Map[String, String] = Map.empty,
       ivyPath: Option[String] = None,
       sharedPrefixes: Seq[String] = Seq.empty,
@@ -79,6 +81,7 @@ private[hive] object IsolatedClientLoader extends Logging {
       hiveVersion(hiveMetastoreVersion),
       sparkConf,
       execJars = files,
+      hadoopConf = hadoopConf,
       config = config,
       sharesHadoopClasses = sharesHadoopClasses,
       sharedPrefixes = sharedPrefixes,
@@ -149,6 +152,7 @@ private[hive] object IsolatedClientLoader extends Logging {
 private[hive] class IsolatedClientLoader(
     val version: HiveVersion,
     val sparkConf: SparkConf,
+    val hadoopConf: Configuration,
     val execJars: Seq[URL] = Seq.empty,
     val config: Map[String, String] = Map.empty,
     val isolationOn: Boolean = true,
@@ -238,7 +242,7 @@ private[hive] class IsolatedClientLoader(
   /** The isolated client interface to Hive. */
   private[hive] def createClient(): HiveClient = {
     if (!isolationOn) {
-      return new HiveClientImpl(version, sparkConf, config, baseClassLoader, this)
+      return new HiveClientImpl(version, sparkConf, hadoopConf, config, baseClassLoader, this)
     }
     // Pre-reflective instantiation setup.
     logDebug("Initializing the logger to avoid disaster...")
@@ -249,7 +253,7 @@ private[hive] class IsolatedClientLoader(
       classLoader
         .loadClass(classOf[HiveClientImpl].getName)
         .getConstructors.head
-        .newInstance(version, sparkConf, config, classLoader, this)
+        .newInstance(version, sparkConf, hadoopConf, config, classLoader, this)
         .asInstanceOf[HiveClient]
     } catch {
       case e: InvocationTargetException =>
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
index 0dc4fea22db72f3dc8bed348ae9db0f0b48323c5..427f5747a010fa733dde9851029200030ee40825 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hive
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.util.VersionInfo
 
 import org.apache.spark.SparkConf
@@ -33,7 +34,8 @@ class HiveCatalogSuite extends CatalogTestCases {
     IsolatedClientLoader.forVersion(
       hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
       hadoopVersion = VersionInfo.getVersion,
-      sparkConf = new SparkConf()).createClient()
+      sparkConf = new SparkConf(),
+      hadoopConf = new Configuration()).createClient()
   }
 
   protected override val utils: CatalogTestUtils = new CatalogTestUtils {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 3d54da11ade4ccafb50a5cf2c7cf6650fa0ca41b..f218ab80a7cc3104588b6e1b53379a9e3e57d561 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.client
 
 import java.io.File
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.util.VersionInfo
 
 import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
@@ -63,12 +64,26 @@ class VersionsSuite extends SparkFunSuite with Logging {
       hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
       hadoopVersion = VersionInfo.getVersion,
       sparkConf = sparkConf,
+      hadoopConf = new Configuration(),
       config = buildConf(),
       ivyPath = ivyPath).createClient()
     val db = new CatalogDatabase("default", "desc", "loc", Map())
     badClient.createDatabase(db, ignoreIfExists = true)
   }
 
+  test("hadoop configuration preserved") {
+    val hadoopConf = new Configuration();
+    hadoopConf.set("test", "success")
+    val client = IsolatedClientLoader.forVersion(
+      hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
+      hadoopVersion = VersionInfo.getVersion,
+      sparkConf = sparkConf,
+      hadoopConf = hadoopConf,
+      config = buildConf(),
+      ivyPath = ivyPath).createClient()
+    assert("success" === client.getConf("test", null))
+  }
+
   private def getNestedMessages(e: Throwable): String = {
     var causes = ""
     var lastException = e
@@ -98,6 +113,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
           hiveMetastoreVersion = "13",
           hadoopVersion = VersionInfo.getVersion,
           sparkConf = sparkConf,
+          hadoopConf = new Configuration(),
           config = buildConf(),
           ivyPath = ivyPath).createClient()
       }
@@ -118,6 +134,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
           hiveMetastoreVersion = version,
           hadoopVersion = VersionInfo.getVersion,
           sparkConf = sparkConf,
+          hadoopConf = new Configuration(),
           config = buildConf(),
           ivyPath = ivyPath).createClient()
     }