diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 0afab522af82912407231c481e5367cf5b194c02..4fd81bc63bc1d46a9454551f3868d80dcea1f1f8 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -419,8 +419,9 @@ class SparkContext( } addedFiles(key) = System.currentTimeMillis - // Fetch the file locally in case the task is executed locally - val filename = new File(path.split("/").last) + // Fetch the file locally in case a job is executed locally. + // Jobs that run through LocalScheduler will already fetch the required dependencies, + // but jobs run in DAGScheduler.runLocally() will not so we must fetch the files here. Utils.fetchFile(path, new File(".")) logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key)) @@ -437,11 +438,10 @@ class SparkContext( } /** - * Clear the job's list of files added by `addFile` so that they do not get donwloaded to + * Clear the job's list of files added by `addFile` so that they do not get downloaded to * any new nodes. */ def clearFiles() { - addedFiles.keySet.map(_.split("/").last).foreach { k => new File(k).delete() } addedFiles.clear() } @@ -465,7 +465,6 @@ class SparkContext( * any new nodes. */ def clearJars() { - addedJars.keySet.map(_.split("/").last).foreach { k => new File(k).delete() } addedJars.clear() } diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 6d64b32174f05f68ea769c39ab691fd4c156a59a..c10b415a93ab819065457bf617bfb28ffca3a2d3 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -9,6 +9,7 @@ import org.apache.hadoop.fs.{Path, FileSystem, FileUtil} import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import scala.io.Source +import com.google.common.io.Files /** * Various utility methods used by Spark. @@ -130,28 +131,47 @@ private object Utils extends Logging { */ def fetchFile(url: String, targetDir: File) { val filename = url.split("/").last + val tempDir = System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")) + val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir)) val targetFile = new File(targetDir, filename) val uri = new URI(url) uri.getScheme match { case "http" | "https" | "ftp" => - logInfo("Fetching " + url + " to " + targetFile) + logInfo("Fetching " + url + " to " + tempFile) val in = new URL(url).openStream() - val out = new FileOutputStream(targetFile) + val out = new FileOutputStream(tempFile) Utils.copyStream(in, out, true) + if (targetFile.exists && !Files.equal(tempFile, targetFile)) { + logWarning("File " + targetFile + " exists and does not match contents of " + url + + "; using existing version") + tempFile.delete() + } else { + Files.move(tempFile, targetFile) + } case "file" | null => - // Remove the file if it already exists - targetFile.delete() - // Symlink the file locally. - if (uri.isAbsolute) { - // url is absolute, i.e. it starts with "file:///". Extract the source - // file's absolute path from the url. - val sourceFile = new File(uri) - logInfo("Symlinking " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath) - FileUtil.symLink(sourceFile.getAbsolutePath, targetFile.getAbsolutePath) + val sourceFile = if (uri.isAbsolute) { + new File(uri) + } else { + new File(url) + } + if (targetFile.exists && !Files.equal(sourceFile, targetFile)) { + logWarning("File " + targetFile + " exists and does not match contents of " + url + + "; using existing version") } else { - // url is not absolute, i.e. itself is the path to the source file. - logInfo("Symlinking " + url + " to " + targetFile.getAbsolutePath) - FileUtil.symLink(url, targetFile.getAbsolutePath) + // Remove the file if it already exists + targetFile.delete() + // Symlink the file locally. + if (uri.isAbsolute) { + // url is absolute, i.e. it starts with "file:///". Extract the source + // file's absolute path from the url. + val sourceFile = new File(uri) + logInfo("Symlinking " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath) + FileUtil.symLink(sourceFile.getAbsolutePath, targetFile.getAbsolutePath) + } else { + // url is not absolute, i.e. itself is the path to the source file. + logInfo("Symlinking " + url + " to " + targetFile.getAbsolutePath) + FileUtil.symLink(url, targetFile.getAbsolutePath) + } } case _ => // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others @@ -159,8 +179,15 @@ private object Utils extends Logging { val conf = new Configuration() val fs = FileSystem.get(uri, conf) val in = fs.open(new Path(uri)) - val out = new FileOutputStream(targetFile) + val out = new FileOutputStream(tempFile) Utils.copyStream(in, out, true) + if (targetFile.exists && !Files.equal(tempFile, targetFile)) { + logWarning("File " + targetFile + " exists and does not match contents of " + url + + "; using existing version") + tempFile.delete() + } else { + Files.move(tempFile, targetFile) + } } // Decompress the file if it's a .tar or .tar.gz if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) {