diff --git a/core/pom.xml b/core/pom.xml index 868834dd505ef877c15343a6f44c9b59afabb301..6cd1965ec37c20494027c5e1851d1854a4d0a67a 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -275,7 +275,7 @@ <dependency> <groupId>org.tachyonproject</groupId> <artifactId>tachyon-client</artifactId> - <version>0.6.1</version> + <version>0.5.0</version> <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala index 2ab6a8f3ec1d4e1eaf7734062729afa6ef72b5f6..af873034215a99a25ab9aada540d73abe4aad4dc 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala @@ -20,8 +20,8 @@ package org.apache.spark.storage import java.text.SimpleDateFormat import java.util.{Date, Random} -import tachyon.TachyonURI -import tachyon.client.{TachyonFile, TachyonFS} +import tachyon.client.TachyonFS +import tachyon.client.TachyonFile import org.apache.spark.Logging import org.apache.spark.executor.ExecutorExitCode @@ -40,7 +40,7 @@ private[spark] class TachyonBlockManager( val master: String) extends Logging { - val client = if (master != null && master != "") TachyonFS.get(new TachyonURI(master)) else null + val client = if (master != null && master != "") TachyonFS.get(master) else null if (client == null) { logError("Failed to connect to the Tachyon as the master address is not configured") @@ -60,11 +60,11 @@ private[spark] class TachyonBlockManager( addShutdownHook() def removeFile(file: TachyonFile): Boolean = { - client.delete(new TachyonURI(file.getPath()), false) + client.delete(file.getPath(), false) } def fileExists(file: TachyonFile): Boolean = { - client.exist(new TachyonURI(file.getPath())) + client.exist(file.getPath()) } def getFile(filename: String): TachyonFile = { @@ -81,7 +81,7 @@ private[spark] class TachyonBlockManager( if (old != null) { old } else { - val path = new TachyonURI(s"${tachyonDirs(dirId)}/${"%02x".format(subDirId)}") + val path = tachyonDirs(dirId) + "/" + "%02x".format(subDirId) client.mkdir(path) val newDir = client.getFile(path) subDirs(dirId)(subDirId) = newDir @@ -89,7 +89,7 @@ private[spark] class TachyonBlockManager( } } } - val filePath = new TachyonURI(s"$subDir/$filename") + val filePath = subDir + "/" + filename if(!client.exist(filePath)) { client.createFile(filePath) } @@ -101,7 +101,7 @@ private[spark] class TachyonBlockManager( // TODO: Some of the logic here could be consolidated/de-duplicated with that in the DiskStore. private def createTachyonDirs(): Array[TachyonFile] = { - logDebug(s"Creating tachyon directories at root dirs '$rootDirs'") + logDebug("Creating tachyon directories at root dirs '" + rootDirs + "'") val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") rootDirs.split(",").map { rootDir => var foundLocalDir = false @@ -113,21 +113,22 @@ private[spark] class TachyonBlockManager( tries += 1 try { tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536)) - val path = new TachyonURI(s"$rootDir/spark-tachyon-$tachyonDirId") + val path = rootDir + "/" + "spark-tachyon-" + tachyonDirId if (!client.exist(path)) { foundLocalDir = client.mkdir(path) tachyonDir = client.getFile(path) } } catch { case e: Exception => - logWarning(s"Attempt $tries to create tachyon dir $tachyonDir failed", e) + logWarning("Attempt " + tries + " to create tachyon dir " + tachyonDir + " failed", e) } } if (!foundLocalDir) { - logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create tachyon dir in $rootDir") + logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + " attempts to create tachyon dir in " + + rootDir) System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_CREATE_DIR) } - logInfo(s"Created tachyon directory at $tachyonDir") + logInfo("Created tachyon directory at " + tachyonDir) tachyonDir } } @@ -144,7 +145,7 @@ private[spark] class TachyonBlockManager( } } catch { case e: Exception => - logError(s"Exception while deleting tachyon spark dir: $tachyonDir", e) + logError("Exception while deleting tachyon spark dir: " + tachyonDir, e) } } client.close() diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 91d833295e37655eb8270b6ff0869991dbade636..fa56bb09e4e5cd25a71b94969f77aad3d3953a0b 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -42,8 +42,6 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.log4j.PropertyConfigurator import org.eclipse.jetty.util.MultiException import org.json4s._ - -import tachyon.TachyonURI import tachyon.client.{TachyonFS, TachyonFile} import org.apache.spark._ @@ -972,7 +970,7 @@ private[spark] object Utils extends Logging { * Delete a file or directory and its contents recursively. */ def deleteRecursively(dir: TachyonFile, client: TachyonFS) { - if (!client.delete(new TachyonURI(dir.getPath()), true)) { + if (!client.delete(dir.getPath(), true)) { throw new IOException("Failed to delete the tachyon dir: " + dir) } } diff --git a/make-distribution.sh b/make-distribution.sh index 8162fe94c1af05e683798ce050a5985c81fd9031..9ed1abfe8c598ac94df575aeda4db2ae168329b8 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -32,7 +32,7 @@ SPARK_HOME="$(cd "`dirname "$0"`"; pwd)" DISTDIR="$SPARK_HOME/dist" SPARK_TACHYON=false -TACHYON_VERSION="0.6.1" +TACHYON_VERSION="0.5.0" TACHYON_TGZ="tachyon-${TACHYON_VERSION}-bin.tar.gz" TACHYON_URL="https://github.com/amplab/tachyon/releases/download/v${TACHYON_VERSION}/${TACHYON_TGZ}"