Skip to content
Snippets Groups Projects
Commit c9a4c36d authored by Burak Yavuz's avatar Burak Yavuz Committed by Shivaram Venkataraman
Browse files

[SPARK-8313] R Spark packages support

shivaram cafreeman Could you please help me in testing this out? Exposing and running `rPackageBuilder` from inside the shell works, but for some reason, I can't get it to work during Spark Submit. It just starts relaunching Spark Submit.

For testing, you may use the R branch with [sbt-spark-package](https://github.com/databricks/sbt-spark-package). You can call spPackage, and then pass the jar using `--jars`.

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #7139 from brkyvz/r-submit and squashes the following commits:

0de384f [Burak Yavuz] remove unused imports 2
d253708 [Burak Yavuz] removed unused imports
6603d0d [Burak Yavuz] addressed comments
4258ffe [Burak Yavuz] merged master
ddfcc06 [Burak Yavuz] added zipping test
3a1be7d [Burak Yavuz] don't zip
77995df [Burak Yavuz] fix URI
ac45527 [Burak Yavuz] added zipping of all libs
e6bf7b0 [Burak Yavuz] add println ignores
1bc5554 [Burak Yavuz] add assumes for tests
9778e03 [Burak Yavuz] addressed comments
b42b300 [Burak Yavuz] merged master
ffd134e [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into r-submit
d867756 [Burak Yavuz] add apache header
eff5ba1 [Burak Yavuz] ready for review
8838edb [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into r-submit
e5b5a06 [Burak Yavuz] added doc
bb751ce [Burak Yavuz] fix null bug
0226768 [Burak Yavuz] fixed issues
8810beb [Burak Yavuz] R packages support
parent a7fe48f6
No related branches found
No related tags found
No related merge requests found
......@@ -42,8 +42,4 @@ Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtoo
# Install SparkR to $LIB_DIR
R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/
# Zip the SparkR package so that it can be distributed to worker nodes on YARN
cd $LIB_DIR
jar cfM "$LIB_DIR/sparkr.zip" SparkR
popd > /dev/null
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
library(SparkR)
library(sparkPackageTest)
sc <- sparkR.init()
run1 <- myfunc(5L)
run2 <- myfunc(-4L)
sparkR.stop()
if(run1 != 6) quit(save = "no", status = 1)
if(run2 != -3) quit(save = "no", status = 1)
......@@ -19,6 +19,8 @@ package org.apache.spark.api.r
import java.io.File
import scala.collection.JavaConversions._
import org.apache.spark.{SparkEnv, SparkException}
private[spark] object RUtils {
......@@ -26,7 +28,7 @@ private[spark] object RUtils {
* Get the SparkR package path in the local spark distribution.
*/
def localSparkRPackagePath: Option[String] = {
val sparkHome = sys.env.get("SPARK_HOME")
val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.test.home"))
sparkHome.map(
Seq(_, "R", "lib").mkString(File.separator)
)
......@@ -46,8 +48,8 @@ private[spark] object RUtils {
(sparkConf.get("spark.master"), sparkConf.get("spark.submit.deployMode"))
}
val isYarnCluster = master.contains("yarn") && deployMode == "cluster"
val isYarnClient = master.contains("yarn") && deployMode == "client"
val isYarnCluster = master != null && master.contains("yarn") && deployMode == "cluster"
val isYarnClient = master != null && master.contains("yarn") && deployMode == "client"
// In YARN mode, the SparkR package is distributed as an archive symbolically
// linked to the "sparkr" file in the current directory. Note that this does not apply
......@@ -62,4 +64,10 @@ private[spark] object RUtils {
}
}
}
/** Check if R is installed before running tests that use R commands. */
def isRInstalled: Boolean = {
val builder = new ProcessBuilder(Seq("R", "--version"))
builder.start().waitFor() == 0
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy
import java.io._
import java.util.jar.JarFile
import java.util.logging.Level
import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.JavaConversions._
import com.google.common.io.{ByteStreams, Files}
import org.apache.spark.{SparkException, Logging}
import org.apache.spark.api.r.RUtils
import org.apache.spark.util.{RedirectThread, Utils}
private[deploy] object RPackageUtils extends Logging {
/** The key in the MANIFEST.mf that we look for, in case a jar contains R code. */
private final val hasRPackage = "Spark-HasRPackage"
/** Base of the shell command used in order to install R packages. */
private final val baseInstallCmd = Seq("R", "CMD", "INSTALL", "-l")
/** R source code should exist under R/pkg in a jar. */
private final val RJarEntries = "R/pkg"
/** Documentation on how the R source file layout should be in the jar. */
private[deploy] final val RJarDoc =
s"""In order for Spark to build R packages that are parts of Spark Packages, there are a few
|requirements. The R source code must be shipped in a jar, with additional Java/Scala
|classes. The jar must be in the following format:
| 1- The Manifest (META-INF/MANIFEST.mf) must contain the key-value: $hasRPackage: true
| 2- The standard R package layout must be preserved under R/pkg/ inside the jar. More
| information on the standard R package layout can be found in:
| http://cran.r-project.org/doc/contrib/Leisch-CreatingPackages.pdf
| An example layout is given below. After running `jar tf $$JAR_FILE | sort`:
|
|META-INF/MANIFEST.MF
|R/
|R/pkg/
|R/pkg/DESCRIPTION
|R/pkg/NAMESPACE
|R/pkg/R/
|R/pkg/R/myRcode.R
|org/
|org/apache/
|...
""".stripMargin.trim
/** Internal method for logging. We log to a printStream in tests, for debugging purposes. */
private def print(
msg: String,
printStream: PrintStream,
level: Level = Level.FINE,
e: Throwable = null): Unit = {
if (printStream != null) {
// scalastyle:off println
printStream.println(msg)
// scalastyle:on println
if (e != null) {
e.printStackTrace(printStream)
}
} else {
level match {
case Level.INFO => logInfo(msg)
case Level.WARNING => logWarning(msg)
case Level.SEVERE => logError(msg, e)
case _ => logDebug(msg)
}
}
}
/**
* Checks the manifest of the Jar whether there is any R source code bundled with it.
* Exposed for testing.
*/
private[deploy] def checkManifestForR(jar: JarFile): Boolean = {
val manifest = jar.getManifest.getMainAttributes
manifest.getValue(hasRPackage) != null && manifest.getValue(hasRPackage).trim == "true"
}
/**
* Runs the standard R package installation code to build the R package from source.
* Multiple runs don't cause problems.
*/
private def rPackageBuilder(dir: File, printStream: PrintStream, verbose: Boolean): Boolean = {
// this code should be always running on the driver.
val pathToSparkR = RUtils.localSparkRPackagePath.getOrElse(
throw new SparkException("SPARK_HOME not set. Can't locate SparkR package."))
val pathToPkg = Seq(dir, "R", "pkg").mkString(File.separator)
val installCmd = baseInstallCmd ++ Seq(pathToSparkR, pathToPkg)
if (verbose) {
print(s"Building R package with the command: $installCmd", printStream)
}
try {
val builder = new ProcessBuilder(installCmd)
builder.redirectErrorStream(true)
val env = builder.environment()
env.clear()
val process = builder.start()
new RedirectThread(process.getInputStream, printStream, "redirect R packaging").start()
process.waitFor() == 0
} catch {
case e: Throwable =>
print("Failed to build R package.", printStream, Level.SEVERE, e)
false
}
}
/**
* Extracts the files under /R in the jar to a temporary directory for building.
*/
private def extractRFolder(jar: JarFile, printStream: PrintStream, verbose: Boolean): File = {
val tempDir = Utils.createTempDir(null)
val jarEntries = jar.entries()
while (jarEntries.hasMoreElements) {
val entry = jarEntries.nextElement()
val entryRIndex = entry.getName.indexOf(RJarEntries)
if (entryRIndex > -1) {
val entryPath = entry.getName.substring(entryRIndex)
if (entry.isDirectory) {
val dir = new File(tempDir, entryPath)
if (verbose) {
print(s"Creating directory: $dir", printStream)
}
dir.mkdirs
} else {
val inStream = jar.getInputStream(entry)
val outPath = new File(tempDir, entryPath)
Files.createParentDirs(outPath)
val outStream = new FileOutputStream(outPath)
if (verbose) {
print(s"Extracting $entry to $outPath", printStream)
}
Utils.copyStream(inStream, outStream, closeStreams = true)
}
}
}
tempDir
}
/**
* Extracts the files under /R in the jar to a temporary directory for building.
*/
private[deploy] def checkAndBuildRPackage(
jars: String,
printStream: PrintStream = null,
verbose: Boolean = false): Unit = {
jars.split(",").foreach { jarPath =>
val file = new File(Utils.resolveURI(jarPath))
if (file.exists()) {
val jar = new JarFile(file)
if (checkManifestForR(jar)) {
print(s"$file contains R source code. Now installing package.", printStream, Level.INFO)
val rSource = extractRFolder(jar, printStream, verbose)
try {
if (!rPackageBuilder(rSource, printStream, verbose)) {
print(s"ERROR: Failed to build R package in $file.", printStream)
print(RJarDoc, printStream)
}
} finally {
rSource.delete() // clean up
}
} else {
if (verbose) {
print(s"$file doesn't contain R source code, skipping...", printStream)
}
}
} else {
print(s"WARN: $file resolved as dependency, but not found.", printStream, Level.WARNING)
}
}
}
private def listFilesRecursively(dir: File, excludePatterns: Seq[String]): Set[File] = {
if (!dir.exists()) {
Set.empty[File]
} else {
if (dir.isDirectory) {
val subDir = dir.listFiles(new FilenameFilter {
override def accept(dir: File, name: String): Boolean = {
!excludePatterns.map(name.contains).reduce(_ || _) // exclude files with given pattern
}
})
subDir.flatMap(listFilesRecursively(_, excludePatterns)).toSet
} else {
Set(dir)
}
}
}
/** Zips all the libraries found with SparkR in the R/lib directory for distribution with Yarn. */
private[deploy] def zipRLibraries(dir: File, name: String): File = {
val filesToBundle = listFilesRecursively(dir, Seq(".zip"))
// create a zip file from scratch, do not append to existing file.
val zipFile = new File(dir, name)
zipFile.delete()
val zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFile, false))
try {
filesToBundle.foreach { file =>
// get the relative paths for proper naming in the zip file
val relPath = file.getAbsolutePath.replaceFirst(dir.getAbsolutePath, "")
val fis = new FileInputStream(file)
val zipEntry = new ZipEntry(relPath)
zipOutputStream.putNextEntry(zipEntry)
ByteStreams.copy(fis, zipOutputStream)
zipOutputStream.closeEntry()
fis.close()
}
} finally {
zipOutputStream.close()
}
zipFile
}
}
......@@ -292,6 +292,12 @@ object SparkSubmit {
}
}
// install any R packages that may have been passed through --jars or --packages.
// Spark Packages may contain R source code inside the jar.
if (args.isR && !StringUtils.isBlank(args.jars)) {
RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
}
// Require all python files to be local, so we can add them to the PYTHONPATH
// In YARN cluster mode, python files are distributed as regular files, which can be non-local
if (args.isPython && !isYarnCluster) {
......@@ -361,7 +367,8 @@ object SparkSubmit {
if (rPackagePath.isEmpty) {
printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.")
}
val rPackageFile = new File(rPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
val rPackageFile =
RPackageUtils.zipRLibraries(new File(rPackagePath.get), SPARKR_PACKAGE_ARCHIVE)
if (!rPackageFile.exists()) {
printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
}
......@@ -987,11 +994,9 @@ private[spark] object SparkSubmitUtils {
addExclusionRules(ivySettings, ivyConfName, md)
// add all supplied maven artifacts as dependencies
addDependenciesToIvy(md, artifacts, ivyConfName)
exclusions.foreach { e =>
md.addExcludeRule(createExclusion(e + ":*", ivySettings, ivyConfName))
}
// resolve dependencies
val rr: ResolveReport = ivy.resolve(md, resolveOptions)
if (rr.hasError) {
......
......@@ -611,5 +611,4 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
System.setErr(currentErr)
}
}
}
......@@ -19,6 +19,10 @@ package org.apache.spark.deploy
import java.io.{File, FileInputStream, FileOutputStream}
import java.util.jar.{JarEntry, JarOutputStream}
import java.util.jar.Attributes.Name
import java.util.jar.Manifest
import scala.collection.mutable.ArrayBuffer
import com.google.common.io.{Files, ByteStreams}
......@@ -35,7 +39,7 @@ private[deploy] object IvyTestUtils {
* Create the path for the jar and pom from the maven coordinate. Extension should be `jar`
* or `pom`.
*/
private def pathFromCoordinate(
private[deploy] def pathFromCoordinate(
artifact: MavenCoordinate,
prefix: File,
ext: String,
......@@ -52,7 +56,7 @@ private[deploy] object IvyTestUtils {
}
/** Returns the artifact naming based on standard ivy or maven format. */
private def artifactName(
private[deploy] def artifactName(
artifact: MavenCoordinate,
useIvyLayout: Boolean,
ext: String = ".jar"): String = {
......@@ -73,7 +77,7 @@ private[deploy] object IvyTestUtils {
}
/** Write the contents to a file to the supplied directory. */
private def writeFile(dir: File, fileName: String, contents: String): File = {
private[deploy] def writeFile(dir: File, fileName: String, contents: String): File = {
val outputFile = new File(dir, fileName)
val outputStream = new FileOutputStream(outputFile)
outputStream.write(contents.toCharArray.map(_.toByte))
......@@ -90,6 +94,42 @@ private[deploy] object IvyTestUtils {
writeFile(dir, "mylib.py", contents)
}
/** Create an example R package that calls the given Java class. */
private def createRFiles(
dir: File,
className: String,
packageName: String): Seq[(String, File)] = {
val rFilesDir = new File(dir, "R" + File.separator + "pkg")
Files.createParentDirs(new File(rFilesDir, "R" + File.separator + "mylib.R"))
val contents =
s"""myfunc <- function(x) {
| SparkR:::callJStatic("$packageName.$className", "myFunc", x)
|}
""".stripMargin
val source = writeFile(new File(rFilesDir, "R"), "mylib.R", contents)
val description =
"""Package: sparkPackageTest
|Type: Package
|Title: Test for building an R package
|Version: 0.1
|Date: 2015-07-08
|Author: Burak Yavuz
|Imports: methods, SparkR
|Depends: R (>= 3.1), methods, SparkR
|Suggests: testthat
|Description: Test for building an R package within a jar
|License: Apache License (== 2.0)
|Collate: 'mylib.R'
""".stripMargin
val descFile = writeFile(rFilesDir, "DESCRIPTION", description)
val namespace =
"""import(SparkR)
|export("myfunc")
""".stripMargin
val nameFile = writeFile(rFilesDir, "NAMESPACE", namespace)
Seq(("R/pkg/R/mylib.R", source), ("R/pkg/DESCRIPTION", descFile), ("R/pkg/NAMESPACE", nameFile))
}
/** Create a simple testable Class. */
private def createJavaClass(dir: File, className: String, packageName: String): File = {
val contents =
......@@ -97,17 +137,14 @@ private[deploy] object IvyTestUtils {
|
|import java.lang.Integer;
|
|class $className implements java.io.Serializable {
|
| public $className() {}
|
| public Integer myFunc(Integer x) {
|public class $className implements java.io.Serializable {
| public static Integer myFunc(Integer x) {
| return x + 1;
| }
|}
""".stripMargin
val sourceFile =
new JavaSourceFromString(new File(dir, className + ".java").getAbsolutePath, contents)
new JavaSourceFromString(new File(dir, className).getAbsolutePath, contents)
createCompiledClass(className, dir, sourceFile, Seq.empty)
}
......@@ -199,14 +236,25 @@ private[deploy] object IvyTestUtils {
}
/** Create the jar for the given maven coordinate, using the supplied files. */
private def packJar(
private[deploy] def packJar(
dir: File,
artifact: MavenCoordinate,
files: Seq[(String, File)],
useIvyLayout: Boolean): File = {
useIvyLayout: Boolean,
withR: Boolean,
withManifest: Option[Manifest] = None): File = {
val jarFile = new File(dir, artifactName(artifact, useIvyLayout))
val jarFileStream = new FileOutputStream(jarFile)
val jarStream = new JarOutputStream(jarFileStream, new java.util.jar.Manifest())
val manifest = withManifest.getOrElse {
val mani = new Manifest()
if (withR) {
val attr = mani.getMainAttributes
attr.put(Name.MANIFEST_VERSION, "1.0")
attr.put(new Name("Spark-HasRPackage"), "true")
}
mani
}
val jarStream = new JarOutputStream(jarFileStream, manifest)
for (file <- files) {
val jarEntry = new JarEntry(file._1)
......@@ -239,7 +287,8 @@ private[deploy] object IvyTestUtils {
dependencies: Option[Seq[MavenCoordinate]] = None,
tempDir: Option[File] = None,
useIvyLayout: Boolean = false,
withPython: Boolean = false): File = {
withPython: Boolean = false,
withR: Boolean = false): File = {
// Where the root of the repository exists, and what Ivy will search in
val tempPath = tempDir.getOrElse(Files.createTempDir())
// Create directory if it doesn't exist
......@@ -255,14 +304,16 @@ private[deploy] object IvyTestUtils {
val javaClass = createJavaClass(root, className, artifact.groupId)
// A tuple of files representation in the jar, and the file
val javaFile = (artifact.groupId.replace(".", "/") + "/" + javaClass.getName, javaClass)
val allFiles =
if (withPython) {
val pythonFile = createPythonFile(root)
Seq(javaFile, (pythonFile.getName, pythonFile))
} else {
Seq(javaFile)
}
val jarFile = packJar(jarPath, artifact, allFiles, useIvyLayout)
val allFiles = ArrayBuffer[(String, File)](javaFile)
if (withPython) {
val pythonFile = createPythonFile(root)
allFiles.append((pythonFile.getName, pythonFile))
}
if (withR) {
val rFiles = createRFiles(root, className, artifact.groupId)
allFiles.append(rFiles: _*)
}
val jarFile = packJar(jarPath, artifact, allFiles, useIvyLayout, withR)
assert(jarFile.exists(), "Problem creating Jar file")
val descriptor = createDescriptor(tempPath, artifact, dependencies, useIvyLayout)
assert(descriptor.exists(), "Problem creating Pom file")
......@@ -286,9 +337,10 @@ private[deploy] object IvyTestUtils {
dependencies: Option[String],
rootDir: Option[File],
useIvyLayout: Boolean = false,
withPython: Boolean = false): File = {
withPython: Boolean = false,
withR: Boolean = false): File = {
val deps = dependencies.map(SparkSubmitUtils.extractMavenCoordinates)
val mainRepo = createLocalRepository(artifact, deps, rootDir, useIvyLayout, withPython)
val mainRepo = createLocalRepository(artifact, deps, rootDir, useIvyLayout, withPython, withR)
deps.foreach { seq => seq.foreach { dep =>
createLocalRepository(dep, None, Some(mainRepo), useIvyLayout, withPython = false)
}}
......@@ -311,11 +363,12 @@ private[deploy] object IvyTestUtils {
rootDir: Option[File],
useIvyLayout: Boolean = false,
withPython: Boolean = false,
withR: Boolean = false,
ivySettings: IvySettings = new IvySettings)(f: String => Unit): Unit = {
val deps = dependencies.map(SparkSubmitUtils.extractMavenCoordinates)
purgeLocalIvyCache(artifact, deps, ivySettings)
val repo = createLocalRepositoryForTests(artifact, dependencies, rootDir, useIvyLayout,
withPython)
withPython, withR)
try {
f(repo.toURI.toString)
} finally {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy
import java.io.{PrintStream, OutputStream, File}
import java.net.URI
import java.util.jar.Attributes.Name
import java.util.jar.{JarFile, Manifest}
import java.util.zip.{ZipEntry, ZipFile}
import org.scalatest.BeforeAndAfterEach
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import com.google.common.io.Files
import org.apache.commons.io.FileUtils
import org.apache.spark.SparkFunSuite
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
class RPackageUtilsSuite extends SparkFunSuite with BeforeAndAfterEach {
private val main = MavenCoordinate("a", "b", "c")
private val dep1 = MavenCoordinate("a", "dep1", "c")
private val dep2 = MavenCoordinate("a", "dep2", "d")
private def getJarPath(coord: MavenCoordinate, repo: File): File = {
new File(IvyTestUtils.pathFromCoordinate(coord, repo, "jar", useIvyLayout = false),
IvyTestUtils.artifactName(coord, useIvyLayout = false, ".jar"))
}
private val lineBuffer = ArrayBuffer[String]()
private val noOpOutputStream = new OutputStream {
def write(b: Int) = {}
}
/** Simple PrintStream that reads data into a buffer */
private class BufferPrintStream extends PrintStream(noOpOutputStream) {
// scalastyle:off println
override def println(line: String) {
// scalastyle:on println
lineBuffer += line
}
}
def beforeAll() {
System.setProperty("spark.testing", "true")
}
override def beforeEach(): Unit = {
lineBuffer.clear()
}
test("pick which jars to unpack using the manifest") {
val deps = Seq(dep1, dep2).mkString(",")
IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo =>
val jars = Seq(main, dep1, dep2).map(c => new JarFile(getJarPath(c, new File(new URI(repo)))))
assert(RPackageUtils.checkManifestForR(jars(0)), "should have R code")
assert(!RPackageUtils.checkManifestForR(jars(1)), "should not have R code")
assert(!RPackageUtils.checkManifestForR(jars(2)), "should not have R code")
}
}
test("build an R package from a jar end to end") {
assume(RUtils.isRInstalled, "R isn't installed on this machine.")
val deps = Seq(dep1, dep2).mkString(",")
IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo =>
val jars = Seq(main, dep1, dep2).map { c =>
getJarPath(c, new File(new URI(repo)))
}.mkString(",")
RPackageUtils.checkAndBuildRPackage(jars, new BufferPrintStream, verbose = true)
val firstJar = jars.substring(0, jars.indexOf(","))
val output = lineBuffer.mkString("\n")
assert(output.contains("Building R package"))
assert(output.contains("Extracting"))
assert(output.contains(s"$firstJar contains R source code. Now installing package."))
assert(output.contains("doesn't contain R source code, skipping..."))
}
}
test("jars that don't exist are skipped and print warning") {
assume(RUtils.isRInstalled, "R isn't installed on this machine.")
val deps = Seq(dep1, dep2).mkString(",")
IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo =>
val jars = Seq(main, dep1, dep2).map { c =>
getJarPath(c, new File(new URI(repo))) + "dummy"
}.mkString(",")
RPackageUtils.checkAndBuildRPackage(jars, new BufferPrintStream, verbose = true)
val individualJars = jars.split(",")
val output = lineBuffer.mkString("\n")
individualJars.foreach { jarFile =>
assert(output.contains(s"$jarFile"))
}
}
}
test("faulty R package shows documentation") {
assume(RUtils.isRInstalled, "R isn't installed on this machine.")
IvyTestUtils.withRepository(main, None, None) { repo =>
val manifest = new Manifest
val attr = manifest.getMainAttributes
attr.put(Name.MANIFEST_VERSION, "1.0")
attr.put(new Name("Spark-HasRPackage"), "true")
val jar = IvyTestUtils.packJar(new File(new URI(repo)), dep1, Nil,
useIvyLayout = false, withR = false, Some(manifest))
RPackageUtils.checkAndBuildRPackage(jar.getAbsolutePath, new BufferPrintStream,
verbose = true)
val output = lineBuffer.mkString("\n")
assert(output.contains(RPackageUtils.RJarDoc))
}
}
test("SparkR zipping works properly") {
val tempDir = Files.createTempDir()
try {
IvyTestUtils.writeFile(tempDir, "test.R", "abc")
val fakeSparkRDir = new File(tempDir, "SparkR")
assert(fakeSparkRDir.mkdirs())
IvyTestUtils.writeFile(fakeSparkRDir, "abc.R", "abc")
IvyTestUtils.writeFile(fakeSparkRDir, "DESCRIPTION", "abc")
IvyTestUtils.writeFile(tempDir, "package.zip", "abc") // fake zip file :)
val fakePackageDir = new File(tempDir, "packageTest")
assert(fakePackageDir.mkdirs())
IvyTestUtils.writeFile(fakePackageDir, "def.R", "abc")
IvyTestUtils.writeFile(fakePackageDir, "DESCRIPTION", "abc")
val finalZip = RPackageUtils.zipRLibraries(tempDir, "sparkr.zip")
assert(finalZip.exists())
val entries = new ZipFile(finalZip).entries().toSeq.map(_.getName)
assert(entries.contains("/test.R"))
assert(entries.contains("/SparkR/abc.R"))
assert(entries.contains("/SparkR/DESCRIPTION"))
assert(!entries.contains("/package.zip"))
assert(entries.contains("/packageTest/def.R"))
assert(entries.contains("/packageTest/DESCRIPTION"))
} finally {
FileUtils.deleteDirectory(tempDir)
}
}
}
......@@ -362,6 +362,30 @@ class SparkSubmitSuite
}
}
test("correctly builds R packages included in a jar with --packages") {
// TODO(SPARK-9603): Building a package to $SPARK_HOME/R/lib is unavailable on Jenkins.
// It's hard to write the test in SparkR (because we can't create the repository dynamically)
/*
assume(RUtils.isRInstalled, "R isn't installed on this machine.")
val main = MavenCoordinate("my.great.lib", "mylib", "0.1")
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
val rScriptDir =
Seq(sparkHome, "R", "pkg", "inst", "tests", "packageInAJarTest.R").mkString(File.separator)
assert(new File(rScriptDir).exists)
IvyTestUtils.withRepository(main, None, None, withR = true) { repo =>
val args = Seq(
"--name", "testApp",
"--master", "local-cluster[2,1,1024]",
"--packages", main.toString,
"--repositories", repo,
"--verbose",
"--conf", "spark.ui.enabled=false",
rScriptDir)
runSparkSubmit(args)
}
*/
}
test("resolves command line argument paths correctly") {
val jars = "/jar1,/jar2" // --jars
val files = "hdfs:/file1,file2" // --files
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment