diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 4644088f19f4bcc448bd8e5a69660a3eea42fbb9..d3dc1d09cb7b49bfa1460160c86ce455da02eec4 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -624,7 +624,8 @@ private[spark] object Utils extends Logging {
       case _ =>
         val fs = getHadoopFileSystem(uri, hadoopConf)
         val path = new Path(uri)
-        fetchHcfsFile(path, new File(targetDir, path.getName), fs, conf, hadoopConf, fileOverwrite)
+        fetchHcfsFile(path, targetDir, fs, conf, hadoopConf, fileOverwrite,
+                      filename = Some(filename))
     }
   }
 
@@ -639,19 +640,22 @@ private[spark] object Utils extends Logging {
       fs: FileSystem,
       conf: SparkConf,
       hadoopConf: Configuration,
-      fileOverwrite: Boolean): Unit = {
-    if (!targetDir.mkdir()) {
+      fileOverwrite: Boolean,
+      filename: Option[String] = None): Unit = {
+    if (!targetDir.exists() && !targetDir.mkdir()) {
       throw new IOException(s"Failed to create directory ${targetDir.getPath}")
     }
-    fs.listStatus(path).foreach { fileStatus =>
-      val innerPath = fileStatus.getPath
-      if (fileStatus.isDir) {
-        fetchHcfsFile(innerPath, new File(targetDir, innerPath.getName), fs, conf, hadoopConf,
-          fileOverwrite)
-      } else {
-        val in = fs.open(innerPath)
-        val targetFile = new File(targetDir, innerPath.getName)
-        downloadFile(innerPath.toString, in, targetFile, fileOverwrite)
+    val dest = new File(targetDir, filename.getOrElse(path.getName))
+    if (fs.isFile(path)) {
+      val in = fs.open(path)
+      try {
+        downloadFile(path.toString, in, dest, fileOverwrite)
+      } finally {
+        in.close()
+      }
+    } else {
+      fs.listStatus(path).foreach { fileStatus =>
+        fetchHcfsFile(fileStatus.getPath(), dest, fs, conf, hadoopConf, fileOverwrite)
       }
     }
   }
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index fe2b64425115742f1532e730b629dc78f125e384..fd77753c0d3629c16b74d36d7a410ce65027ed3b 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -208,18 +208,18 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
     child1.setLastModified(System.currentTimeMillis() - (1000 * 30))
 
     // although child1 is old, child2 is still new so return true
-    assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5)) 
+    assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
 
     child2.setLastModified(System.currentTimeMillis - (1000 * 30))
-    assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5)) 
+    assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
 
     parent.setLastModified(System.currentTimeMillis - (1000 * 30))
     // although parent and its immediate children are new, child3 is still old
     // we expect a full recursive search for new files.
-    assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5)) 
+    assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
 
     child3.setLastModified(System.currentTimeMillis - (1000 * 30))
-    assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5)) 
+    assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5))
   }
 
   test("resolveURI") {
@@ -339,21 +339,21 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
     assert(!tempDir1.exists())
 
     val tempDir2 = Utils.createTempDir()
-    val tempFile1 = new File(tempDir2, "foo.txt")
-    Files.touch(tempFile1)
-    assert(tempFile1.exists())
-    Utils.deleteRecursively(tempFile1)
-    assert(!tempFile1.exists())
+    val sourceFile1 = new File(tempDir2, "foo.txt")
+    Files.touch(sourceFile1)
+    assert(sourceFile1.exists())
+    Utils.deleteRecursively(sourceFile1)
+    assert(!sourceFile1.exists())
 
     val tempDir3 = new File(tempDir2, "subdir")
     assert(tempDir3.mkdir())
-    val tempFile2 = new File(tempDir3, "bar.txt")
-    Files.touch(tempFile2)
-    assert(tempFile2.exists())
+    val sourceFile2 = new File(tempDir3, "bar.txt")
+    Files.touch(sourceFile2)
+    assert(sourceFile2.exists())
     Utils.deleteRecursively(tempDir2)
     assert(!tempDir2.exists())
     assert(!tempDir3.exists())
-    assert(!tempFile2.exists())
+    assert(!sourceFile2.exists())
   }
 
   test("loading properties from file") {
@@ -386,30 +386,39 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
   }
 
   test("fetch hcfs dir") {
-    val tempDir = Utils.createTempDir()
-    val innerTempDir = Utils.createTempDir(tempDir.getPath)
-    val tempFile = File.createTempFile("someprefix", "somesuffix", innerTempDir)
-    val targetDir = new File("target-dir")
-    Files.write("some text", tempFile, UTF_8)
-
-    try {
-      val path = new Path("file://" + tempDir.getAbsolutePath)
-      val conf = new Configuration()
-      val fs = Utils.getHadoopFileSystem(path.toString, conf)
-      Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
-      assert(targetDir.exists())
-      assert(targetDir.isDirectory())
-      val newInnerDir = new File(targetDir, innerTempDir.getName)
-      println("inner temp dir: " + innerTempDir.getName)
-      targetDir.listFiles().map(_.getName).foreach(println)
-      assert(newInnerDir.exists())
-      assert(newInnerDir.isDirectory())
-      val newInnerFile = new File(newInnerDir, tempFile.getName)
-      assert(newInnerFile.exists())
-      assert(newInnerFile.isFile())
-    } finally {
-      Utils.deleteRecursively(tempDir)
-      Utils.deleteRecursively(targetDir)
-    }
+    val sourceDir = Utils.createTempDir()
+    val innerSourceDir = Utils.createTempDir(root=sourceDir.getPath)
+    val sourceFile = File.createTempFile("someprefix", "somesuffix", innerSourceDir)
+    val targetDir = new File(Utils.createTempDir(), "target-dir")
+    Files.write("some text", sourceFile, UTF_8)
+
+    val path = new Path("file://" + sourceDir.getAbsolutePath)
+    val conf = new Configuration()
+    val fs = Utils.getHadoopFileSystem(path.toString, conf)
+
+    assert(!targetDir.isDirectory())
+    Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
+    assert(targetDir.isDirectory())
+
+    // Copy again to make sure it doesn't error if the dir already exists.
+    Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
+
+    val destDir = new File(targetDir, sourceDir.getName())
+    assert(destDir.isDirectory())
+
+    val destInnerDir = new File(destDir, innerSourceDir.getName)
+    assert(destInnerDir.isDirectory())
+
+    val destInnerFile = new File(destInnerDir, sourceFile.getName)
+    assert(destInnerFile.isFile())
+
+    val filePath = new Path("file://" + sourceFile.getAbsolutePath)
+    val testFileDir = new File("test-filename")
+    val testFileName = "testFName"
+    val testFilefs = Utils.getHadoopFileSystem(filePath.toString, conf)
+    Utils.fetchHcfsFile(filePath, testFileDir, testFilefs, new SparkConf(),
+                        conf, false, Some(testFileName))
+    val newFileName = new File(testFileDir, testFileName)
+    assert(newFileName.isFile())
   }
 }