diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 9633f9e15baffcb9e0c465b63bd356f4438ae05c..f071df75816e35a694ed434058897c13d798d2e4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -107,7 +107,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
     new SparkPlanner(sparkSession.sparkContext, conf, experimentalMethods.extraStrategies)
       with HiveStrategies {
       override val sparkSession: SparkSession = self.sparkSession
-      override val hiveconf: HiveConf = self.hiveconf
 
       override def strategies: Seq[Strategy] = {
         experimentalMethods.extraStrategies ++ Seq(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 7d1f87f3909a260d069a1dbcbda931999235b988..71b180e55b58c71bc46eed72959a2f0b9d752516 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -17,16 +17,12 @@
 
 package org.apache.spark.sql.hive
 
-import org.apache.hadoop.hive.conf.HiveConf
-
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hive.execution._
 
 private[hive] trait HiveStrategies {
@@ -34,13 +30,12 @@ private[hive] trait HiveStrategies {
   self: SparkPlanner =>
 
   val sparkSession: SparkSession
-  val hiveconf: HiveConf
 
   object Scripts extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case logical.ScriptTransformation(input, script, output, child, ioschema) =>
         val hiveIoSchema = HiveScriptIOSchema(ioschema)
-        ScriptTransformation(input, script, output, planLater(child), hiveIoSchema)(hiveconf) :: Nil
+        ScriptTransformation(input, script, output, planLater(child), hiveIoSchema) :: Nil
       case _ => Nil
     }
   }
@@ -78,7 +73,7 @@ private[hive] trait HiveStrategies {
           projectList,
           otherPredicates,
           identity[Seq[Expression]],
-          HiveTableScanExec(_, relation, pruningPredicates)(sparkSession, hiveconf)) :: Nil
+          HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil
       case _ =>
         Nil
     }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index af0317f7a164ccf2dd1c18b15329e617ad7b9cd0..df6abc258b8f8357c5183663d76d014f1d7aa6b3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -17,17 +17,14 @@
 
 package org.apache.spark.sql.hive
 
-import java.util
-
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, PathFilter}
-import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
 import org.apache.hadoop.hive.ql.exec.Utilities
 import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
 import org.apache.hadoop.hive.ql.plan.TableDesc
 import org.apache.hadoop.hive.serde2.Deserializer
-import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters,
-  StructObjectInspector}
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector}
 import org.apache.hadoop.hive.serde2.objectinspector.primitive._
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
@@ -62,7 +59,7 @@ class HadoopTableReader(
     @transient private val attributes: Seq[Attribute],
     @transient private val relation: MetastoreRelation,
     @transient private val sparkSession: SparkSession,
-    hiveconf: HiveConf)
+    hadoopConf: Configuration)
   extends TableReader with Logging {
 
   // Hadoop honors "mapred.map.tasks" as hint, but will ignore when mapred.job.tracker is "local".
@@ -72,12 +69,15 @@ class HadoopTableReader(
   private val _minSplitsPerRDD = if (sparkSession.sparkContext.isLocal) {
     0 // will splitted based on block by default.
   } else {
-    math.max(hiveconf.getInt("mapred.map.tasks", 1), sparkSession.sparkContext.defaultMinPartitions)
+    math.max(hadoopConf.getInt("mapred.map.tasks", 1),
+      sparkSession.sparkContext.defaultMinPartitions)
   }
 
-  SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations(sparkSession.sparkContext.conf, hiveconf)
-  private val _broadcastedHiveConf =
-    sparkSession.sparkContext.broadcast(new SerializableConfiguration(hiveconf))
+  SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations(
+    sparkSession.sparkContext.conf, hadoopConf)
+
+  private val _broadcastedHadoopConf =
+    sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
 
   override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] =
     makeRDDForTable(
@@ -105,7 +105,7 @@ class HadoopTableReader(
     // Create local references to member variables, so that the entire `this` object won't be
     // serialized in the closure below.
     val tableDesc = relation.tableDesc
-    val broadcastedHiveConf = _broadcastedHiveConf
+    val broadcastedHiveConf = _broadcastedHadoopConf
 
     val tablePath = hiveTable.getPath
     val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)
@@ -162,7 +162,7 @@ class HadoopTableReader(
           case (partition, partDeserializer) =>
             def updateExistPathSetByPathPattern(pathPatternStr: String) {
               val pathPattern = new Path(pathPatternStr)
-              val fs = pathPattern.getFileSystem(hiveconf)
+              val fs = pathPattern.getFileSystem(hadoopConf)
               val matches = fs.globStatus(pathPattern)
               matches.foreach(fileStatus => existPathSet += fileStatus.getPath.toString)
             }
@@ -209,7 +209,7 @@ class HadoopTableReader(
 
       // Create local references so that the outer object isn't serialized.
       val tableDesc = relation.tableDesc
-      val broadcastedHiveConf = _broadcastedHiveConf
+      val broadcastedHiveConf = _broadcastedHadoopConf
       val localDeserializer = partDeserializer
       val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
 
@@ -259,7 +259,7 @@ class HadoopTableReader(
   private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): String = {
     filterOpt match {
       case Some(filter) =>
-        val fs = path.getFileSystem(hiveconf)
+        val fs = path.getFileSystem(hadoopConf)
         val filteredFiles = fs.listStatus(path, filter).map(_.getPath.toString)
         filteredFiles.mkString(",")
       case None => path.toString
@@ -279,7 +279,7 @@ class HadoopTableReader(
 
     val rdd = new HadoopRDD(
       sparkSession.sparkContext,
-      _broadcastedHiveConf.asInstanceOf[Broadcast[SerializableConfiguration]],
+      _broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]],
       Some(initializeJobConfFunc),
       inputFormatClass,
       classOf[Writable],
@@ -302,7 +302,7 @@ private[hive] object HiveTableUtil {
     val storageHandler =
       org.apache.hadoop.hive.ql.metadata.HiveUtils.getStorageHandler(jobConf, property)
     if (storageHandler != null) {
-      val jobProperties = new util.LinkedHashMap[String, String]
+      val jobProperties = new java.util.LinkedHashMap[String, String]
       if (input) {
         storageHandler.configureInputJobProperties(tableDesc, jobProperties)
       } else {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index cc5bbf59dbccc09511d00cb3914c00dc13da6ae3..007c3384e5701533dff7f66d42287a31b6b057be 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution
 
 import scala.collection.JavaConverters._
 
-import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
 import org.apache.hadoop.hive.serde.serdeConstants
 import org.apache.hadoop.hive.serde2.objectinspector._
@@ -48,8 +48,7 @@ case class HiveTableScanExec(
     requestedAttributes: Seq[Attribute],
     relation: MetastoreRelation,
     partitionPruningPred: Seq[Expression])(
-    @transient private val sparkSession: SparkSession,
-    @transient private val hiveconf: HiveConf)
+    @transient private val sparkSession: SparkSession)
   extends LeafExecNode {
 
   require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned,
@@ -77,20 +76,20 @@ case class HiveTableScanExec(
   // Create a local copy of hiveconf,so that scan specific modifications should not impact
   // other queries
   @transient
-  private[this] val hiveExtraConf = new HiveConf(hiveconf)
+  private[this] val hadoopConf = sparkSession.sessionState.newHadoopConf()
 
   // append columns ids and names before broadcast
-  addColumnMetadataToConf(hiveExtraConf)
+  addColumnMetadataToConf(hadoopConf)
 
   @transient
   private[this] val hadoopReader =
-    new HadoopTableReader(attributes, relation, sparkSession, hiveExtraConf)
+    new HadoopTableReader(attributes, relation, sparkSession, hadoopConf)
 
   private[this] def castFromString(value: String, dataType: DataType) = {
     Cast(Literal(value), dataType).eval(null)
   }
 
-  private def addColumnMetadataToConf(hiveConf: HiveConf) {
+  private def addColumnMetadataToConf(hiveConf: Configuration) {
     // Specifies needed column IDs for those non-partitioning columns.
     val neededColumnIDs = attributes.flatMap(relation.columnOrdinals.get).map(o => o: Integer)
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index f27337eb36a64cefdb040fe1670896bcb73c154d..f6e6a75c3ee58bfe1001567ae21df52ce5d26a8f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -26,7 +26,6 @@ import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.ql.exec.{RecordReader, RecordWriter}
 import org.apache.hadoop.hive.serde.serdeConstants
 import org.apache.hadoop.hive.serde2.AbstractSerDe
@@ -58,17 +57,14 @@ case class ScriptTransformation(
     script: String,
     output: Seq[Attribute],
     child: SparkPlan,
-    ioschema: HiveScriptIOSchema)(@transient private val hiveconf: HiveConf)
+    ioschema: HiveScriptIOSchema)
   extends UnaryExecNode {
 
-  override protected def otherCopyArgs: Seq[HiveConf] = hiveconf :: Nil
-
   override def producedAttributes: AttributeSet = outputSet -- inputSet
 
-  private val serializedHiveConf = new SerializableConfiguration(hiveconf)
-
   protected override def doExecute(): RDD[InternalRow] = {
-    def processIterator(inputIterator: Iterator[InternalRow]): Iterator[InternalRow] = {
+    def processIterator(inputIterator: Iterator[InternalRow], hadoopConf: Configuration)
+      : Iterator[InternalRow] = {
       val cmd = List("/bin/bash", "-c", script)
       val builder = new ProcessBuilder(cmd.asJava)
 
@@ -76,7 +72,6 @@ case class ScriptTransformation(
       val inputStream = proc.getInputStream
       val outputStream = proc.getOutputStream
       val errorStream = proc.getErrorStream
-      val localHiveConf = serializedHiveConf.value
 
       // In order to avoid deadlocks, we need to consume the error output of the child process.
       // To avoid issues caused by large error output, we use a circular buffer to limit the amount
@@ -107,7 +102,7 @@ case class ScriptTransformation(
         proc,
         stderrBuffer,
         TaskContext.get(),
-        localHiveConf
+        hadoopConf
       )
 
       // This nullability is a performance optimization in order to avoid an Option.foreach() call
@@ -122,7 +117,7 @@ case class ScriptTransformation(
         val scriptOutputStream = new DataInputStream(inputStream)
 
         @Nullable val scriptOutputReader =
-          ioschema.recordReader(scriptOutputStream, localHiveConf).orNull
+          ioschema.recordReader(scriptOutputStream, hadoopConf).orNull
 
         var scriptOutputWritable: Writable = null
         val reusedWritableObject: Writable = if (null != outputSerde) {
@@ -214,10 +209,13 @@ case class ScriptTransformation(
       outputIterator
     }
 
+    val broadcastedHadoopConf =
+      new SerializableConfiguration(sqlContext.sessionState.newHadoopConf())
+
     child.execute().mapPartitions { iter =>
       if (iter.hasNext) {
         val proj = UnsafeProjection.create(schema)
-        processIterator(iter).map(proj)
+        processIterator(iter, broadcastedHadoopConf.value).map(proj)
       } else {
         // If the input iterator has no rows then do not launch the external script.
         Iterator.empty
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
index 1a15fb741a4babe93974d70bece6dd5a6cc1da85..19e8025d6b7c940f6a830aba62112c1d5f9d536e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
@@ -58,7 +58,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
         output = Seq(AttributeReference("a", StringType)()),
         child = child,
         ioschema = noSerdeIOSchema
-      )(hiveContext.sessionState.hiveconf),
+      ),
       rowsDf.collect())
   }
 
@@ -72,7 +72,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
         output = Seq(AttributeReference("a", StringType)()),
         child = child,
         ioschema = serdeIOSchema
-      )(hiveContext.sessionState.hiveconf),
+      ),
       rowsDf.collect())
   }
 
@@ -87,7 +87,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
           output = Seq(AttributeReference("a", StringType)()),
           child = ExceptionInjectingOperator(child),
           ioschema = noSerdeIOSchema
-        )(hiveContext.sessionState.hiveconf),
+        ),
         rowsDf.collect())
     }
     assert(e.getMessage().contains("intentional exception"))
@@ -104,7 +104,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
           output = Seq(AttributeReference("a", StringType)()),
           child = ExceptionInjectingOperator(child),
           ioschema = serdeIOSchema
-        )(hiveContext.sessionState.hiveconf),
+        ),
         rowsDf.collect())
     }
     assert(e.getMessage().contains("intentional exception"))