Skip to content
Snippets Groups Projects
Commit ae1f54aa authored by Sean Owen's avatar Sean Owen Committed by Reynold Xin
Browse files

[SPARK-12500][CORE] Fix Tachyon deprecations; pull Tachyon dependency into one class

Fix Tachyon deprecations; pull Tachyon dependency into `TachyonBlockManager` only

CC calvinjia as I probably need a double-check that the usage of the new API is correct.

Author: Sean Owen <sowen@cloudera.com>

Closes #10449 from srowen/SPARK-12500.
parent 43b2a639
No related branches found
No related tags found
No related merge requests found
......@@ -26,13 +26,17 @@ import scala.util.control.NonFatal
import com.google.common.io.ByteStreams
import tachyon.client.{ReadType, WriteType, TachyonFS, TachyonFile}
import tachyon.{Constants, TachyonURI}
import tachyon.client.ClientContext
import tachyon.client.file.{TachyonFile, TachyonFileSystem}
import tachyon.client.file.TachyonFileSystem.TachyonFileSystemFactory
import tachyon.client.file.options.DeleteOptions
import tachyon.conf.TachyonConf
import tachyon.TachyonURI
import tachyon.exception.{FileAlreadyExistsException, FileDoesNotExistException}
import org.apache.spark.Logging
import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.util.{ShutdownHookManager, Utils}
import org.apache.spark.util.Utils
/**
......@@ -44,15 +48,15 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
var rootDirs: String = _
var master: String = _
var client: tachyon.client.TachyonFS = _
var client: TachyonFileSystem = _
private var subDirsPerTachyonDir: Int = _
// Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName;
// then, inside this directory, create multiple subdirectories that we will hash files into,
// in order to avoid having really large inodes at the top level in Tachyon.
private var tachyonDirs: Array[TachyonFile] = _
private var subDirs: Array[Array[tachyon.client.TachyonFile]] = _
private var subDirs: Array[Array[TachyonFile]] = _
private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()
override def init(blockManager: BlockManager, executorId: String): Unit = {
super.init(blockManager, executorId)
......@@ -62,7 +66,10 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
rootDirs = s"$storeDir/$appFolderName/$executorId"
master = blockManager.conf.get(ExternalBlockStore.MASTER_URL, "tachyon://localhost:19998")
client = if (master != null && master != "") {
TachyonFS.get(new TachyonURI(master), new TachyonConf())
val tachyonConf = new TachyonConf()
tachyonConf.set(Constants.MASTER_ADDRESS, master)
ClientContext.reset(tachyonConf)
TachyonFileSystemFactory.get
} else {
null
}
......@@ -80,7 +87,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
// in order to avoid having really large inodes at the top level in Tachyon.
tachyonDirs = createTachyonDirs()
subDirs = Array.fill(tachyonDirs.length)(new Array[TachyonFile](subDirsPerTachyonDir))
tachyonDirs.foreach(tachyonDir => ShutdownHookManager.registerShutdownDeleteDir(tachyonDir))
tachyonDirs.foreach(registerShutdownDeleteDir)
}
override def toString: String = {"ExternalBlockStore-Tachyon"}
......@@ -89,6 +96,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
val file = getFile(blockId)
if (fileExists(file)) {
removeFile(file)
true
} else {
false
}
......@@ -101,7 +109,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
override def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit = {
val file = getFile(blockId)
val os = file.getOutStream(WriteType.TRY_CACHE)
val os = client.getOutStream(new TachyonURI(client.getInfo(file).getPath))
try {
Utils.writeByteBuffer(bytes, os)
} catch {
......@@ -115,7 +123,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
override def putValues(blockId: BlockId, values: Iterator[_]): Unit = {
val file = getFile(blockId)
val os = file.getOutStream(WriteType.TRY_CACHE)
val os = client.getOutStream(new TachyonURI(client.getInfo(file).getPath))
try {
blockManager.dataSerializeStream(blockId, os, values)
} catch {
......@@ -129,12 +137,17 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
val file = getFile(blockId)
if (file == null || file.getLocationHosts.size == 0) {
if (file == null) {
return None
}
val is = file.getInStream(ReadType.CACHE)
val is = try {
client.getInStream(file)
} catch {
case _: FileDoesNotExistException =>
return None
}
try {
val size = file.length
val size = client.getInfo(file).length
val bs = new Array[Byte](size.asInstanceOf[Int])
ByteStreams.readFully(is, bs)
Some(ByteBuffer.wrap(bs))
......@@ -149,25 +162,37 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
override def getValues(blockId: BlockId): Option[Iterator[_]] = {
val file = getFile(blockId)
if (file == null || file.getLocationHosts().size() == 0) {
if (file == null) {
return None
}
val is = file.getInStream(ReadType.CACHE)
Option(is).map { is =>
blockManager.dataDeserializeStream(blockId, is)
val is = try {
client.getInStream(file)
} catch {
case _: FileDoesNotExistException =>
return None
}
try {
Some(blockManager.dataDeserializeStream(blockId, is))
} finally {
is.close()
}
}
override def getSize(blockId: BlockId): Long = {
getFile(blockId.name).length
client.getInfo(getFile(blockId.name)).length
}
def removeFile(file: TachyonFile): Boolean = {
client.delete(new TachyonURI(file.getPath()), false)
def removeFile(file: TachyonFile): Unit = {
client.delete(file)
}
def fileExists(file: TachyonFile): Boolean = {
client.exist(new TachyonURI(file.getPath()))
try {
client.getInfo(file)
true
} catch {
case _: FileDoesNotExistException => false
}
}
def getFile(filename: String): TachyonFile = {
......@@ -186,18 +211,18 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
} else {
val path = new TachyonURI(s"${tachyonDirs(dirId)}/${"%02x".format(subDirId)}")
client.mkdir(path)
val newDir = client.getFile(path)
val newDir = client.loadMetadata(path)
subDirs(dirId)(subDirId) = newDir
newDir
}
}
}
val filePath = new TachyonURI(s"$subDir/$filename")
if(!client.exist(filePath)) {
client.createFile(filePath)
try {
client.create(filePath)
} catch {
case _: FileAlreadyExistsException => client.loadMetadata(filePath)
}
val file = client.getFile(filePath)
file
}
def getFile(blockId: BlockId): TachyonFile = getFile(blockId.name)
......@@ -217,9 +242,11 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
try {
tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
val path = new TachyonURI(s"$rootDir/spark-tachyon-$tachyonDirId")
if (!client.exist(path)) {
try {
foundLocalDir = client.mkdir(path)
tachyonDir = client.getFile(path)
tachyonDir = client.loadMetadata(path)
} catch {
case _: FileAlreadyExistsException => // continue
}
} catch {
case NonFatal(e) =>
......@@ -240,14 +267,60 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
logDebug("Shutdown hook called")
tachyonDirs.foreach { tachyonDir =>
try {
if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(tachyonDir)) {
Utils.deleteRecursively(tachyonDir, client)
if (!hasRootAsShutdownDeleteDir(tachyonDir)) {
deleteRecursively(tachyonDir, client)
}
} catch {
case NonFatal(e) =>
logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
}
}
client.close()
}
/**
* Delete a file or directory and its contents recursively.
*/
private def deleteRecursively(dir: TachyonFile, client: TachyonFileSystem) {
client.delete(dir, new DeleteOptions.Builder(ClientContext.getConf).setRecursive(true).build())
}
// Register the tachyon path to be deleted via shutdown hook
private def registerShutdownDeleteDir(file: TachyonFile) {
val absolutePath = client.getInfo(file).getPath
shutdownDeleteTachyonPaths.synchronized {
shutdownDeleteTachyonPaths += absolutePath
}
}
// Remove the tachyon path to be deleted via shutdown hook
private def removeShutdownDeleteDir(file: TachyonFile) {
val absolutePath = client.getInfo(file).getPath
shutdownDeleteTachyonPaths.synchronized {
shutdownDeleteTachyonPaths -= absolutePath
}
}
// Is the path already registered to be deleted via a shutdown hook ?
private def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = {
val absolutePath = client.getInfo(file).getPath
shutdownDeleteTachyonPaths.synchronized {
shutdownDeleteTachyonPaths.contains(absolutePath)
}
}
// Note: if file is child of some registered path, while not equal to it, then return true;
// else false. This is to ensure that two shutdown hooks do not try to delete each others
// paths - resulting in Exception and incomplete cleanup.
private def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = {
val absolutePath = client.getInfo(file).getPath
val hasRoot = shutdownDeleteTachyonPaths.synchronized {
shutdownDeleteTachyonPaths.exists(
path => !absolutePath.equals(path) && absolutePath.startsWith(path))
}
if (hasRoot) {
logInfo(s"path = $absolutePath, already present as root for deletion.")
}
hasRoot
}
}
......@@ -21,7 +21,6 @@ import java.io.File
import java.util.PriorityQueue
import scala.util.{Failure, Success, Try}
import tachyon.client.TachyonFile
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.Logging
......@@ -52,7 +51,6 @@ private[spark] object ShutdownHookManager extends Logging {
}
private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()
// Add a shutdown hook to delete the temp dirs when the JVM exits
addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () =>
......@@ -77,14 +75,6 @@ private[spark] object ShutdownHookManager extends Logging {
}
}
// Register the tachyon path to be deleted via shutdown hook
def registerShutdownDeleteDir(tachyonfile: TachyonFile) {
val absolutePath = tachyonfile.getPath()
shutdownDeleteTachyonPaths.synchronized {
shutdownDeleteTachyonPaths += absolutePath
}
}
// Remove the path to be deleted via shutdown hook
def removeShutdownDeleteDir(file: File) {
val absolutePath = file.getAbsolutePath()
......@@ -93,14 +83,6 @@ private[spark] object ShutdownHookManager extends Logging {
}
}
// Remove the tachyon path to be deleted via shutdown hook
def removeShutdownDeleteDir(tachyonfile: TachyonFile) {
val absolutePath = tachyonfile.getPath()
shutdownDeleteTachyonPaths.synchronized {
shutdownDeleteTachyonPaths.remove(absolutePath)
}
}
// Is the path already registered to be deleted via a shutdown hook ?
def hasShutdownDeleteDir(file: File): Boolean = {
val absolutePath = file.getAbsolutePath()
......@@ -109,14 +91,6 @@ private[spark] object ShutdownHookManager extends Logging {
}
}
// Is the path already registered to be deleted via a shutdown hook ?
def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = {
val absolutePath = file.getPath()
shutdownDeleteTachyonPaths.synchronized {
shutdownDeleteTachyonPaths.contains(absolutePath)
}
}
// Note: if file is child of some registered path, while not equal to it, then return true;
// else false. This is to ensure that two shutdown hooks do not try to delete each others
// paths - resulting in IOException and incomplete cleanup.
......@@ -133,22 +107,6 @@ private[spark] object ShutdownHookManager extends Logging {
retval
}
// Note: if file is child of some registered path, while not equal to it, then return true;
// else false. This is to ensure that two shutdown hooks do not try to delete each others
// paths - resulting in Exception and incomplete cleanup.
def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = {
val absolutePath = file.getPath()
val retval = shutdownDeleteTachyonPaths.synchronized {
shutdownDeleteTachyonPaths.exists { path =>
!absolutePath.equals(path) && absolutePath.startsWith(path)
}
}
if (retval) {
logInfo("path = " + file + ", already present as root for deletion.")
}
retval
}
/**
* Detect whether this thread might be executing a shutdown hook. Will always return true if
* the current thread is a running a shutdown hook but may spuriously return true otherwise (e.g.
......
......@@ -44,8 +44,6 @@ import org.apache.log4j.PropertyConfigurator
import org.eclipse.jetty.util.MultiException
import org.json4s._
import org.slf4j.Logger
import tachyon.TachyonURI
import tachyon.client.{TachyonFS, TachyonFile}
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
......@@ -946,15 +944,6 @@ private[spark] object Utils extends Logging {
}
}
/**
* Delete a file or directory and its contents recursively.
*/
def deleteRecursively(dir: TachyonFile, client: TachyonFS) {
if (!client.delete(new TachyonURI(dir.getPath()), true)) {
throw new IOException("Failed to delete the tachyon dir: " + dir)
}
}
/**
* Check to see if file is a symbolic link.
*/
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment