diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 228076f01c8418fdbedc279496a3a068029f1786..6a16a31654630fca16311a5245be99cf6f868439 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -804,6 +804,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       vClass: Class[V],
       conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
     assertNotStopped()
+    // The call to new NewHadoopJob automatically adds security credentials to conf, 
+    // so we don't need to explicitly add them ourselves
     val job = new NewHadoopJob(conf)
     NewFileInputFormat.addInputPath(job, new Path(path))
     val updatedConf = job.getConfiguration
@@ -826,7 +828,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       kClass: Class[K],
       vClass: Class[V]): RDD[(K, V)] = {
     assertNotStopped()
-    new NewHadoopRDD(this, fClass, kClass, vClass, conf)
+    // Add necessary security credentials to the JobConf. Required to access secure HDFS.
+    val jconf = new JobConf(conf)
+    SparkHadoopUtil.get.addCredentials(jconf)
+    new NewHadoopRDD(this, fClass, kClass, vClass, jconf)
   }
 
   /** Get an RDD for a Hadoop SequenceFile with given key and value types.