Skip to content
Snippets Groups Projects
Commit bed1b081 authored by Reynold Xin's avatar Reynold Xin
Browse files

Do not create symlink for local add file. Instead, copy the file.

This prevents Spark from changing the original file's permission, and
also allow add file to work on non-posix operating systems.
parent ef77bb73
No related branches found
No related tags found
No related merge requests found
...@@ -4,20 +4,26 @@ import java.io._ ...@@ -4,20 +4,26 @@ import java.io._
import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket} import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket}
import java.util.{Locale, Random, UUID} import java.util.{Locale, Random, UUID}
import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor} import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
import org.apache.hadoop.fs.{Path, FileSystem, FileUtil} import java.util.regex.Pattern
import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.io.Source import scala.io.Source
import com.google.common.io.Files import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
import spark.serializer.SerializerInstance import spark.serializer.SerializerInstance
import spark.deploy.SparkHadoopUtil import spark.deploy.SparkHadoopUtil
import java.util.regex.Pattern
/** /**
* Various utility methods used by Spark. * Various utility methods used by Spark.
*/ */
private object Utils extends Logging { private object Utils extends Logging {
/** Serialize an object using Java serialization */ /** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = { def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream() val bos = new ByteArrayOutputStream()
...@@ -68,7 +74,6 @@ private object Utils extends Logging { ...@@ -68,7 +74,6 @@ private object Utils extends Logging {
return buf return buf
} }
private val shutdownDeletePaths = new collection.mutable.HashSet[String]() private val shutdownDeletePaths = new collection.mutable.HashSet[String]()
// Register the path to be deleted via shutdown hook // Register the path to be deleted via shutdown hook
...@@ -87,19 +92,19 @@ private object Utils extends Logging { ...@@ -87,19 +92,19 @@ private object Utils extends Logging {
} }
} }
// Note: if file is child of some registered path, while not equal to it, then return true; else false // Note: if file is child of some registered path, while not equal to it, then return true;
// This is to ensure that two shutdown hooks do not try to delete each others paths - resulting in IOException // else false. This is to ensure that two shutdown hooks do not try to delete each others
// and incomplete cleanup // paths - resulting in IOException and incomplete cleanup.
def hasRootAsShutdownDeleteDir(file: File): Boolean = { def hasRootAsShutdownDeleteDir(file: File): Boolean = {
val absolutePath = file.getAbsolutePath() val absolutePath = file.getAbsolutePath()
val retval = shutdownDeletePaths.synchronized { val retval = shutdownDeletePaths.synchronized {
shutdownDeletePaths.find(path => ! absolutePath.equals(path) && absolutePath.startsWith(path) ).isDefined shutdownDeletePaths.find { path =>
!absolutePath.equals(path) && absolutePath.startsWith(path)
}.isDefined
}
if (retval) {
logInfo("path = " + file + ", already present as root for deletion.")
} }
if (retval) logInfo("path = " + file + ", already present as root for deletion.")
retval retval
} }
...@@ -131,7 +136,7 @@ private object Utils extends Logging { ...@@ -131,7 +136,7 @@ private object Utils extends Logging {
if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir) if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir)
} }
}) })
return dir dir
} }
/** Copy all data from an InputStream to an OutputStream */ /** Copy all data from an InputStream to an OutputStream */
...@@ -174,35 +179,30 @@ private object Utils extends Logging { ...@@ -174,35 +179,30 @@ private object Utils extends Logging {
Utils.copyStream(in, out, true) Utils.copyStream(in, out, true)
if (targetFile.exists && !Files.equal(tempFile, targetFile)) { if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
tempFile.delete() tempFile.delete()
throw new SparkException("File " + targetFile + " exists and does not match contents of" + throw new SparkException(
" " + url) "File " + targetFile + " exists and does not match contents of" + " " + url)
} else { } else {
Files.move(tempFile, targetFile) Files.move(tempFile, targetFile)
} }
case "file" | null => case "file" | null =>
val sourceFile = if (uri.isAbsolute) { // In the case of a local file, copy the local file to the target directory.
new File(uri) // Note the difference between uri vs url.
} else { val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
new File(url) if (targetFile.exists) {
} // If the target file already exists, warn the user if
if (targetFile.exists && !Files.equal(sourceFile, targetFile)) { if (!Files.equal(sourceFile, targetFile)) {
throw new SparkException("File " + targetFile + " exists and does not match contents of" + throw new SparkException(
" " + url) "File " + targetFile + " exists and does not match contents of" + " " + url)
} else {
// Remove the file if it already exists
targetFile.delete()
// Symlink the file locally.
if (uri.isAbsolute) {
// url is absolute, i.e. it starts with "file:///". Extract the source
// file's absolute path from the url.
val sourceFile = new File(uri)
logInfo("Symlinking " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
FileUtil.symLink(sourceFile.getAbsolutePath, targetFile.getAbsolutePath)
} else { } else {
// url is not absolute, i.e. itself is the path to the source file. // Do nothing if the file contents are the same, i.e. this file has been copied
logInfo("Symlinking " + url + " to " + targetFile.getAbsolutePath) // previously.
FileUtil.symLink(url, targetFile.getAbsolutePath) logInfo(sourceFile.getAbsolutePath + " has been previously copied to "
+ targetFile.getAbsolutePath)
} }
} else {
// The file does not exist in the target directory. Copy it there.
logInfo("Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
Files.copy(sourceFile, targetFile)
} }
case _ => case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
...@@ -323,8 +323,6 @@ private object Utils extends Logging { ...@@ -323,8 +323,6 @@ private object Utils extends Logging {
InetAddress.getByName(address).getHostName InetAddress.getByName(address).getHostName
} }
def localHostPort(): String = { def localHostPort(): String = {
val retval = System.getProperty("spark.hostPort", null) val retval = System.getProperty("spark.hostPort", null)
if (retval == null) { if (retval == null) {
...@@ -382,6 +380,7 @@ private object Utils extends Logging { ...@@ -382,6 +380,7 @@ private object Utils extends Logging {
// Typically, this will be of order of number of nodes in cluster // Typically, this will be of order of number of nodes in cluster
// If not, we should change it to LRUCache or something. // If not, we should change it to LRUCache or something.
private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]() private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]()
def parseHostPort(hostPort: String): (String, Int) = { def parseHostPort(hostPort: String): (String, Int) = {
{ {
// Check cache first. // Check cache first.
...@@ -390,7 +389,8 @@ private object Utils extends Logging { ...@@ -390,7 +389,8 @@ private object Utils extends Logging {
} }
val indx: Int = hostPort.lastIndexOf(':') val indx: Int = hostPort.lastIndexOf(':')
// This is potentially broken - when dealing with ipv6 addresses for example, sigh ... but then hadoop does not support ipv6 right now. // This is potentially broken - when dealing with ipv6 addresses for example, sigh ...
// but then hadoop does not support ipv6 right now.
// For now, we assume that if port exists, then it is valid - not check if it is an int > 0 // For now, we assume that if port exists, then it is valid - not check if it is an int > 0
if (-1 == indx) { if (-1 == indx) {
val retval = (hostPort, 0) val retval = (hostPort, 0)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment