diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 146609ae3911a09fb5e9325a78d743edd3fccff4..7a1197830443f1dff89639c692571953c7c57601 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -24,6 +24,7 @@ import scala.reflect.ClassTag
 
 import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
 import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
@@ -93,7 +94,13 @@ class NewHadoopRDD[K, V](
       // issues, this cloning is disabled by default.
       NewHadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
         logDebug("Cloning Hadoop Configuration")
-        new Configuration(conf)
+        // The Configuration passed in is actually a JobConf and possibly contains credentials.
+        // To keep those credentials properly we have to create a new JobConf not a Configuration.
+        if (conf.isInstanceOf[JobConf]) {
+          new JobConf(conf)
+        } else {
+          new Configuration(conf)
+        }
       }
     } else {
       conf