Skip to content
Snippets Groups Projects
Commit 08ec10de authored by Prashant Sharma's avatar Prashant Sharma
Browse files

Removed a repeated test and changed tests to not use uncommons jar

parent 436f3d28
No related branches found
No related tags found
No related merge requests found
......@@ -17,15 +17,46 @@
package org.apache.spark
import java.io._
import java.util.jar.{JarEntry, JarOutputStream}
import SparkContext._
import com.google.common.io.Files
import org.scalatest.FunSuite
import java.io.{File, PrintWriter, FileReader, BufferedReader}
import SparkContext._
class FileServerSuite extends FunSuite with LocalSparkContext {
@transient var tmpFile: File = _
@transient var testJarFile: File = _
@transient var testJarFile: String = _
override def beforeAll() {
super.beforeAll()
val buffer = new Array[Byte](10240)
val tmpdir = new File(Files.createTempDir(), "test")
tmpdir.mkdir()
val tmpJarEntry = new File(tmpdir, "FileServerSuite2.txt")
val pw = new PrintWriter(tmpJarEntry)
pw.println("test String in the file named FileServerSuite2.txt")
pw.close()
// The ugliest code possible, was translated from java.
val tmpFile2 = new File(tmpdir, "test.jar")
val stream = new FileOutputStream(tmpFile2)
val jar = new JarOutputStream(stream, new java.util.jar.Manifest())
val jarAdd = new JarEntry(tmpJarEntry.getName)
jarAdd.setTime(tmpJarEntry.lastModified)
jar.putNextEntry(jarAdd)
val in = new FileInputStream(tmpJarEntry)
var nRead = 0
while (nRead <= 0) {
nRead = in.read(buffer, 0, buffer.length)
jar.write(buffer, 0, nRead)
}
in.close()
jar.close()
stream.close()
testJarFile = tmpFile2.getAbsolutePath
}
override def beforeEach() {
super.beforeEach()
......@@ -75,20 +106,15 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
assert(result.toSet === Set((1,200), (2,300), (3,500)))
}
ignore ("Dynamically adding JARS locally") {
test ("Dynamically adding JARS locally") {
sc = new SparkContext("local[4]", "test")
val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile()
sc.addJar(sampleJarFile)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
val result = sc.parallelize(testData).reduceByKey { (x,y) =>
val fac = Thread.currentThread.getContextClassLoader()
.loadClass("org.uncommons.maths.Maths")
.getDeclaredMethod("factorial", classOf[Int])
val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
a + b
}.collect()
assert(result.toSet === Set((1,2), (2,7), (3,121)))
sc.addJar(testJarFile)
val testData = Array((1, 1))
sc.parallelize(testData).foreach { (x) =>
if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite2.txt") == null) {
throw new SparkException("jar not added")
}
}
}
test("Distributing files on a standalone cluster") {
......@@ -105,35 +131,15 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
assert(result.toSet === Set((1,200), (2,300), (3,500)))
}
ignore ("Dynamically adding JARS on a standalone cluster") {
test ("Dynamically adding JARS on a standalone cluster") {
sc = new SparkContext("local-cluster[1,1,512]", "test")
val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile()
sc.addJar(sampleJarFile)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
val result = sc.parallelize(testData).reduceByKey { (x,y) =>
val fac = Thread.currentThread.getContextClassLoader()
.loadClass("org.uncommons.maths.Maths")
.getDeclaredMethod("factorial", classOf[Int])
val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
a + b
}.collect()
assert(result.toSet === Set((1,2), (2,7), (3,121)))
sc.addJar(testJarFile)
val testData = Array((1,1))
sc.parallelize(testData).foreach { (x) =>
if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite2.txt") == null) {
throw new SparkException("jar not added")
}
}
}
ignore ("Dynamically adding JARS on a standalone cluster using local: URL") {
sc = new SparkContext("local-cluster[1,1,512]", "test")
val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile()
sc.addJar(sampleJarFile.replace("file", "local"))
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
val result = sc.parallelize(testData).reduceByKey { (x,y) =>
val fac = Thread.currentThread.getContextClassLoader()
.loadClass("org.uncommons.maths.Maths")
.getDeclaredMethod("factorial", classOf[Int])
val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
a + b
}.collect()
assert(result.toSet === Set((1,2), (2,7), (3,121)))
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment