diff --git a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala
index a0fb4fe25d188b02f1ac62245eaaa0de86b5b46c..f1c86de4cc96154b4254d2cfee6e07ad46602c2f 100644
--- a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala
@@ -1,5 +1,6 @@
 package spark.deploy
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
 
 
 /**
@@ -20,4 +21,7 @@ object SparkHadoopUtil {
 
   // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
   def newConfiguration(): Configuration = new Configuration()
+
+  // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
+  def addCredentials(conf: JobConf) {}
 }
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala
index ab1ab9d8a7a8e8530938f6a1ddc4c860cffb1af0..301a57fffa2ca31df37eb7ca5da49a8555e30b6c 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala
@@ -1,6 +1,7 @@
 package spark.deploy
 
 import collection.mutable.HashMap
+import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.conf.Configuration
@@ -27,18 +28,7 @@ object SparkHadoopUtil {
   }
 
   def runAsUser(func: (Product) => Unit, args: Product, user: String) {
-
-    // println("running as user " + jobUserName)
-
-    UserGroupInformation.setConfiguration(yarnConf)
-    val appMasterUgi: UserGroupInformation = UserGroupInformation.createRemoteUser(user)
-    appMasterUgi.doAs(new PrivilegedExceptionAction[AnyRef] {
-      def run: AnyRef = {
-        func(args)
-        // no return value ...
-        null
-      }
-    })
+    func(args)
   }
 
   // Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true.
@@ -60,4 +50,10 @@ object SparkHadoopUtil {
   // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
   // Always create a new config, dont reuse yarnConf.
   def newConfiguration(): Configuration = new YarnConfiguration(new Configuration())
+
+  // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
+  def addCredentials(conf: JobConf) {
+    val jobCreds = conf.getCredentials();
+    jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
+  }
 }
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
index aa72c1e5fef1c71484ccd82331206e7748d5ddb8..f19648ec6865ff78c8c80d527f19616f6c956fef 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
@@ -30,23 +30,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
 
   def run() {
     
-    // Initialization
-    val jobUserName = Utils.getUserNameFromEnvironment()
-    logInfo("running as user " + jobUserName)
-
-    // run as user ...
-    UserGroupInformation.setConfiguration(yarnConf)
-    val appMasterUgi: UserGroupInformation = UserGroupInformation.createRemoteUser(jobUserName)
-    appMasterUgi.doAs(new PrivilegedExceptionAction[AnyRef] {
-      def run: AnyRef = {
-        runImpl()
-        return null
-      }
-    })
-  }
-
-  private def runImpl() {
-
     appAttemptId = getApplicationAttemptId()
     resourceManager = registerWithResourceManager()
     val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
index 7a881e26dfffc423f7ab60d719926a9581db4dd9..514c17f241eba48297ab69a615941fd33a88744f 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
@@ -1,9 +1,13 @@
 package spark.deploy.yarn
 
 import java.net.{InetSocketAddress, URI}
+import java.nio.ByteBuffer
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.mapred.Master
 import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api._
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.api.protocolrecords._
@@ -23,6 +27,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
   
   var rpc: YarnRPC = YarnRPC.create(conf)
   val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+  val credentials = UserGroupInformation.getCurrentUser().getCredentials();
   
   def run() {
     init(yarnConf)
@@ -40,8 +45,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
 
     appContext.setQueue(args.amQueue)
     appContext.setAMContainerSpec(amContainer)
-    appContext.setUser(args.amUser)
-    
+    appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
+
     submitApp(appContext)
     
     monitorApplication(appId)
@@ -62,14 +67,21 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
   
   def verifyClusterResources(app: GetNewApplicationResponse) = { 
     val maxMem = app.getMaximumResourceCapability().getMemory()
-    logInfo("Max mem capabililty of resources in this cluster " + maxMem)
+    logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
     
-    // If the cluster does not have enough memory resources, exit.
-    val requestedMem = (args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + args.numWorkers * args.workerMemory
-    if (requestedMem > maxMem) {
-      logError("Cluster cannot satisfy memory resource request of " + requestedMem)
+    // if we have requested more then the clusters max for a single resource then exit.
+    if (args.workerMemory > maxMem) {
+      logError("the worker size is to large to run on this cluster " + args.workerMemory);
+      System.exit(1)
+    }
+    val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+    if (amMem > maxMem) {
+      logError("AM size is to large to run on this cluster "  + amMem)
       System.exit(1)
     }
+
+    // We could add checks to make sure the entire cluster has enough resources but that involves getting
+    // all the node reports and computing ourselves 
   }
   
   def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = {
@@ -86,6 +98,15 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
     // Upload Spark and the application JAR to the remote file system
     // Add them as local resources to the AM
     val fs = FileSystem.get(conf)
+
+    val delegTokenRenewer = Master.getMasterPrincipal(conf);
+    if (UserGroupInformation.isSecurityEnabled()) {
+      if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+        logError("Can't get Master Kerberos principal for use as renewer")
+        System.exit(1)
+      }
+    }
+
     Map("spark.jar" -> System.getenv("SPARK_JAR"), "app.jar" -> args.userJar, "log4j.properties" -> System.getenv("SPARK_LOG4J_CONF"))
     .foreach { case(destName, _localPath) =>
       val localPath: String = if (_localPath != null) _localPath.trim() else ""
@@ -97,6 +118,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
         fs.copyFromLocalFile(false, true, src, dst)
         val destStatus = fs.getFileStatus(dst)
 
+        // get tokens for anything we upload to hdfs
+        if (UserGroupInformation.isSecurityEnabled()) {
+          fs.addDelegationTokens(delegTokenRenewer, credentials);
+        }
+
         val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
         amJarRsrc.setType(LocalResourceType.FILE)
         amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION)
@@ -106,6 +132,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
         locaResources(destName) = amJarRsrc
       }
     }
+    UserGroupInformation.getCurrentUser().addCredentials(credentials);
     return locaResources
   }
   
@@ -114,7 +141,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
     val log4jConfLocalRes = localResources.getOrElse("log4j.properties", null)
 
     val env = new HashMap[String, String]()
-    Apps.addToEnvironment(env, Environment.USER.name, args.amUser)
 
     // If log4j present, ensure ours overrides all others
     if (log4jConfLocalRes != null) Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./")
@@ -142,6 +168,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
       env("SPARK_YARN_LOG4J_SIZE") =  log4jConfLocalRes.getSize().toString()
     }
 
+
     // Add each SPARK-* key to the environment
     System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
     return env
@@ -195,7 +222,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
     }
 
     // Command for the ApplicationMaster
-    val commands = List[String]("java " +
+    var javaCommand = "java";
+    val javaHome = System.getenv("JAVA_HOME")
+    if (javaHome != null && !javaHome.isEmpty()) {
+      javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
+    }
+
+    val commands = List[String](javaCommand + 
       " -server " +
       JAVA_OPTS +
       " spark.deploy.yarn.ApplicationMaster" +
@@ -214,7 +247,12 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
     // Memory for the ApplicationMaster
     capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
     amContainer.setResource(capability)
-    
+
+    // Setup security tokens
+    val dob = new DataOutputBuffer()
+    credentials.writeTokenStorageToStream(dob)
+    amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData()))
+
     return amContainer
   }
   
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala
index 24110558e7da8333efae3fb25a0f3d375e610ba4..07e7edea36313b4690a932cca56388723332f794 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala
@@ -13,7 +13,6 @@ class ClientArguments(val args: Array[String]) {
   var workerMemory = 1024
   var workerCores = 1
   var numWorkers = 2
-  var amUser = System.getProperty("user.name")
   var amQueue = System.getProperty("QUEUE", "default")
   var amMemory: Int = 512
   // TODO
@@ -58,10 +57,6 @@ class ClientArguments(val args: Array[String]) {
           workerCores = value
           args = tail
 
-        case ("--user") :: value :: tail =>
-          amUser = value
-          args = tail
-
         case ("--queue") :: value :: tail =>
           amQueue = value
           args = tail
@@ -96,8 +91,7 @@ class ClientArguments(val args: Array[String]) {
       "  --worker-cores NUM   Number of cores for the workers (Default: 1). This is unsused right now.\n" +
       "  --master-memory MEM  Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
       "  --worker-memory MEM  Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
-      "  --queue QUEUE        The hadoop queue to use for allocation requests (Default: 'default')\n" +
-      "  --user USERNAME      Run the ApplicationMaster (and slaves) as a different user\n"
+      "  --queue QUEUE        The hadoop queue to use for allocation requests (Default: 'default')"
       )
     System.exit(exitCode)
   }
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala
index a2bf0af762417e2d9d3085abef1983a6caa39448..cc6f3344a1836970cfd55516c96860f93ee95463 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala
@@ -1,9 +1,12 @@
 package spark.deploy.yarn
 
 import java.net.URI
+import java.nio.ByteBuffer
+import java.security.PrivilegedExceptionAction
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.io.DataOutputBuffer
 import org.apache.hadoop.net.NetUtils
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api._
@@ -11,7 +14,7 @@ import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
+import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 
 import scala.collection.JavaConversions._
@@ -76,7 +79,19 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
 */
 
     ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
-    val commands = List[String]("java " +
+
+    val credentials = UserGroupInformation.getCurrentUser().getCredentials()
+    val dob = new DataOutputBuffer()
+    credentials.writeTokenStorageToStream(dob)
+    ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
+
+    var javaCommand = "java";
+    val javaHome = System.getenv("JAVA_HOME")
+    if (javaHome != null && !javaHome.isEmpty()) {
+      javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
+    }
+
+    val commands = List[String](javaCommand +
       " -server " +
       // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
       // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in an inconsistent state.
@@ -143,8 +158,6 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
   
   def prepareEnvironment: HashMap[String, String] = {
     val env = new HashMap[String, String]()
-    // should we add this ?
-    Apps.addToEnvironment(env, Environment.USER.name, Utils.getUserNameFromEnvironment())
 
     // If log4j present, ensure ours overrides all others
     if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) {
@@ -165,7 +178,23 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
     val cmHostPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort()
     val cmAddress = NetUtils.createSocketAddr(cmHostPortStr)
     logInfo("Connecting to ContainerManager at " + cmHostPortStr)
-    return rpc.getProxy(classOf[ContainerManager], cmAddress, conf).asInstanceOf[ContainerManager]
+
+    // use doAs and remoteUser here so we can add the container token and not 
+    // pollute the current users credentials with all of the individual container tokens
+    val user = UserGroupInformation.createRemoteUser(container.getId().toString());
+    val containerToken = container.getContainerToken();
+    if (containerToken != null) {
+      user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress))
+    }
+
+    val proxy = user
+        .doAs(new PrivilegedExceptionAction[ContainerManager] {
+          def run: ContainerManager = {
+            return rpc.getProxy(classOf[ContainerManager],
+                cmAddress, conf).asInstanceOf[ContainerManager]
+          }
+        });
+    return proxy;
   }
   
 }
diff --git a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala
index a0fb4fe25d188b02f1ac62245eaaa0de86b5b46c..f1c86de4cc96154b4254d2cfee6e07ad46602c2f 100644
--- a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala
@@ -1,5 +1,6 @@
 package spark.deploy
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
 
 
 /**
@@ -20,4 +21,7 @@ object SparkHadoopUtil {
 
   // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
   def newConfiguration(): Configuration = new Configuration()
+
+  // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
+  def addCredentials(conf: JobConf) {}
 }
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 7630fe7803575938a8475e40291a52f016041ed5..8b313c645f81cd546de12a80c1af1c74010afa43 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.mapred.OutputFormat
 
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
 import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil}
+import org.apache.hadoop.security.UserGroupInformation
 
 import spark.partial.BoundedDouble
 import spark.partial.PartialResult
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 366afb2a2a7e23a150c034d7e04fc59a20577674..228e831dff4950b147f6cfb446d9d86a34991c28 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapred.TextInputFormat
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
+import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.mesos.MesosNativeLibrary
 
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index cbf5512e247a46773294a428b8f2d6f9d5034876..07c103503c87bb1d82ba4df3c65b03dcd21649d4 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -15,6 +15,7 @@ import org.apache.hadoop.mapred.RecordReader
 import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.util.ReflectionUtils
 
+import spark.deploy.SparkHadoopUtil
 import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
 import spark.util.NextIterator
 import org.apache.hadoop.conf.Configurable
@@ -50,6 +51,7 @@ class HadoopRDD[K, V](
   private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
 
   override def getPartitions: Array[Partition] = {
+    SparkHadoopUtil.addCredentials(conf);
     val inputFormat = createInputFormat(conf)
     if (inputFormat.isInstanceOf[Configurable]) {
       inputFormat.asInstanceOf[Configurable].setConf(conf)
diff --git a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
index 287f731787f1b184d4e2781dbd28e1f4463f5866..17d0ea4f80f6401299cf153d35f81a95485f0b35 100644
--- a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
@@ -3,11 +3,13 @@ package spark.scheduler
 import spark.Logging
 import scala.collection.immutable.Set
 import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
+import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.util.ReflectionUtils
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.conf.Configuration
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 import scala.collection.JavaConversions._
+import spark.deploy.SparkHadoopUtil
 
 
 /**
@@ -70,6 +72,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
   // This method does not expect failures, since validate has already passed ...
   private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = {
     val conf = new JobConf(configuration)
+    SparkHadoopUtil.addCredentials(conf);
     FileInputFormat.setInputPaths(conf, path)
 
     val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] =
@@ -89,6 +92,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
   // This method does not expect failures, since validate has already passed ...
   private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = {
     val jobConf = new JobConf(configuration)
+    SparkHadoopUtil.addCredentials(jobConf);
     FileInputFormat.setInputPaths(jobConf, path)
 
     val instance: org.apache.hadoop.mapred.InputFormat[_, _] =
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index 3ec922957d635135f21f279005405ec556c9d898..8d0a83d439996c70ba024f8460b5ca7766241d54 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -14,6 +14,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.hadoop.mapred.{JobConf, OutputFormat}
 import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
 import org.apache.hadoop.mapred.OutputFormat
+import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.conf.Configuration
 
 class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](self: DStream[(K,V)])
@@ -470,7 +471,7 @@ extends Serializable {
       valueClass: Class[_],
       outputFormatClass: Class[_ <: OutputFormat[_, _]],
       conf: JobConf = new JobConf
-    ) {
+    ) {  
     val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
       val file = rddToFileName(prefix, suffix, time)
       rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)