diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 84626df553a38de903b9968ead04e1e886a28c7d..ec15326014e8d1e8e7ab058c4c2c1118bfd26c79 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -4,20 +4,26 @@ import java.io._
 import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket}
 import java.util.{Locale, Random, UUID}
 import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
-import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
+import java.util.regex.Pattern
+
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.collection.JavaConversions._
 import scala.io.Source
+
 import com.google.common.io.Files
 import com.google.common.util.concurrent.ThreadFactoryBuilder
+
+import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
+
 import spark.serializer.SerializerInstance
 import spark.deploy.SparkHadoopUtil
-import java.util.regex.Pattern
+
 
 /**
  * Various utility methods used by Spark.
  */
 private object Utils extends Logging {
+
   /** Serialize an object using Java serialization */
   def serialize[T](o: T): Array[Byte] = {
     val bos = new ByteArrayOutputStream()
@@ -68,7 +74,6 @@ private object Utils extends Logging {
     return buf
   }
 
-
   private val shutdownDeletePaths = new collection.mutable.HashSet[String]()
 
   // Register the path to be deleted via shutdown hook
@@ -87,19 +92,19 @@ private object Utils extends Logging {
     }
   }
 
-  // Note: if file is child of some registered path, while not equal to it, then return true; else false
-  // This is to ensure that two shutdown hooks do not try to delete each others paths - resulting in IOException
-  // and incomplete cleanup
+  // Note: if file is child of some registered path, while not equal to it, then return true;
+  // else false. This is to ensure that two shutdown hooks do not try to delete each others
+  // paths - resulting in IOException and incomplete cleanup.
   def hasRootAsShutdownDeleteDir(file: File): Boolean = {
-
     val absolutePath = file.getAbsolutePath()
-
     val retval = shutdownDeletePaths.synchronized {
-      shutdownDeletePaths.find(path => ! absolutePath.equals(path) && absolutePath.startsWith(path) ).isDefined
+      shutdownDeletePaths.find { path =>
+        !absolutePath.equals(path) && absolutePath.startsWith(path)
+      }.isDefined
+    }
+    if (retval) {
+      logInfo("path = " + file + ", already present as root for deletion.")
     }
-
-    if (retval) logInfo("path = " + file + ", already present as root for deletion.")
-
     retval
   }
 
@@ -131,7 +136,7 @@ private object Utils extends Logging {
         if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir)
       }
     })
-    return dir
+    dir
   }
 
   /** Copy all data from an InputStream to an OutputStream */
@@ -174,35 +179,30 @@ private object Utils extends Logging {
         Utils.copyStream(in, out, true)
         if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
           tempFile.delete()
-          throw new SparkException("File " + targetFile + " exists and does not match contents of" +
-            " " + url)
+          throw new SparkException(
+            "File " + targetFile + " exists and does not match contents of" + " " + url)
         } else {
           Files.move(tempFile, targetFile)
         }
       case "file" | null =>
-        val sourceFile = if (uri.isAbsolute) {
-          new File(uri)
-        } else {
-          new File(url)
-        }
-        if (targetFile.exists && !Files.equal(sourceFile, targetFile)) {
-          throw new SparkException("File " + targetFile + " exists and does not match contents of" +
-            " " + url)
-        } else {
-          // Remove the file if it already exists
-          targetFile.delete()
-          // Symlink the file locally.
-          if (uri.isAbsolute) {
-            // url is absolute, i.e. it starts with "file:///". Extract the source
-            // file's absolute path from the url.
-            val sourceFile = new File(uri)
-            logInfo("Symlinking " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
-            FileUtil.symLink(sourceFile.getAbsolutePath, targetFile.getAbsolutePath)
+        // In the case of a local file, copy the local file to the target directory.
+        // Note the difference between uri vs url.
+        val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
+        if (targetFile.exists) {
+          // If the target file already exists, warn the user if
+          if (!Files.equal(sourceFile, targetFile)) {
+            throw new SparkException(
+              "File " + targetFile + " exists and does not match contents of" + " " + url)
           } else {
-            // url is not absolute, i.e. itself is the path to the source file.
-            logInfo("Symlinking " + url + " to " + targetFile.getAbsolutePath)
-            FileUtil.symLink(url, targetFile.getAbsolutePath)
+            // Do nothing if the file contents are the same, i.e. this file has been copied
+            // previously.
+            logInfo(sourceFile.getAbsolutePath + " has been previously copied to "
+              + targetFile.getAbsolutePath)
           }
+        } else {
+          // The file does not exist in the target directory. Copy it there.
+          logInfo("Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
+          Files.copy(sourceFile, targetFile)
         }
       case _ =>
         // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
@@ -323,8 +323,6 @@ private object Utils extends Logging {
     InetAddress.getByName(address).getHostName
   }
 
-
-
   def localHostPort(): String = {
     val retval = System.getProperty("spark.hostPort", null)
     if (retval == null) {
@@ -382,6 +380,7 @@ private object Utils extends Logging {
   // Typically, this will be of order of number of nodes in cluster
   // If not, we should change it to LRUCache or something.
   private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]()
+
   def parseHostPort(hostPort: String): (String,  Int) = {
     {
       // Check cache first.
@@ -390,7 +389,8 @@ private object Utils extends Logging {
     }
 
     val indx: Int = hostPort.lastIndexOf(':')
-    // This is potentially broken - when dealing with ipv6 addresses for example, sigh ... but then hadoop does not support ipv6 right now.
+    // This is potentially broken - when dealing with ipv6 addresses for example, sigh ...
+    // but then hadoop does not support ipv6 right now.
     // For now, we assume that if port exists, then it is valid - not check if it is an int > 0
     if (-1 == indx) {
       val retval = (hostPort, 0)