diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 2909191bd6f14b62e558e2e6b80030fc35bb263f..b5b201409a55d961274e030dabbba8be7d3499d1 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -93,7 +93,10 @@ private[spark] object TestUtils { val jarStream = new JarOutputStream(jarFileStream, new java.util.jar.Manifest()) for (file <- files) { - val jarEntry = new JarEntry(Paths.get(directoryPrefix.getOrElse(""), file.getName).toString) + // The `name` for the argument in `JarEntry` should use / for its separator. This is + // ZIP specification. + val prefix = directoryPrefix.map(d => s"$d/").getOrElse("") + val jarEntry = new JarEntry(prefix + file.getName) jarStream.putNextEntry(jarEntry) val in = new FileInputStream(file) diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 89f0b1cb5b56a7b84e8b573b4ed8bd01d882224d..6538507d407e0917cadf2c69cbdf68e3f3662a96 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -22,6 +22,7 @@ import java.util.zip.GZIPOutputStream import scala.io.Source +import org.apache.hadoop.fs.Path import org.apache.hadoop.io._ import org.apache.hadoop.io.compress.DefaultCodec import org.apache.hadoop.mapred.{FileAlreadyExistsException, FileSplit, JobConf, TextInputFormat, TextOutputFormat} @@ -255,7 +256,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { val (infile: String, indata: PortableDataStream) = inRdd.collect.head // Make sure the name and array match - assert(infile.contains(outFileName)) // a prefix may get added + assert(infile.contains(outFile.toURI.getPath)) // a prefix may get added assert(indata.toArray === testOutput) } @@ -532,7 +533,9 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { .mapPartitionsWithInputSplit { (split, part) => Iterator(split.asInstanceOf[FileSplit].getPath.toUri.getPath) }.collect() - assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001")) + val outPathOne = new Path(outDir, "part-00000").toUri.getPath + val outPathTwo = new Path(outDir, "part-00001").toUri.getPath + assert(inputPaths.toSet === Set(outPathOne, outPathTwo)) } test("Get input files via new Hadoop API") { @@ -546,7 +549,9 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { .mapPartitionsWithInputSplit { (split, part) => Iterator(split.asInstanceOf[NewFileSplit].getPath.toUri.getPath) }.collect() - assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001")) + val outPathOne = new Path(outDir, "part-00000").toUri.getPath + val outPathTwo = new Path(outDir, "part-00001").toUri.getPath + assert(inputPaths.toSet === Set(outPathOne, outPathTwo)) } test("spark.files.ignoreCorruptFiles should work both HadoopRDD and NewHadoopRDD") { diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index a854f5bb9b7ce5612174c62c3c7f557c051a6fde..dc3a28e27cd4e65000755ef095c0af975f2b213c 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.scheduler.{MapStatus, MyRDD, SparkListener, SparkListene import org.apache.spark.serializer.KryoSerializer import org.apache.spark.shuffle.ShuffleWriter import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId} -import org.apache.spark.util.MutablePair +import org.apache.spark.util.{MutablePair, Utils} abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkContext { @@ -51,7 +51,10 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC assert(valuesFor2.toList.sorted === List(1)) } + // Some tests using `local-cluster` here are failed on Windows due to the failure of initiating + // executors by the path length limitation. See SPARK-18718. test("shuffle non-zero block size") { + assume(!Utils.isWindows) sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) val NUM_BLOCKS = 3 @@ -77,6 +80,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC } test("shuffle serializer") { + assume(!Utils.isWindows) // Use a local cluster with 2 processes to make sure there are both local and remote blocks sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) val a = sc.parallelize(1 to 10, 2) @@ -93,6 +97,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC } test("zero sized blocks") { + assume(!Utils.isWindows) // Use a local cluster with 2 processes to make sure there are both local and remote blocks sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) @@ -120,6 +125,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC } test("zero sized blocks without kryo") { + assume(!Utils.isWindows) // Use a local cluster with 2 processes to make sure there are both local and remote blocks sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) @@ -145,6 +151,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC } test("shuffle on mutable pairs") { + assume(!Utils.isWindows) // Use a local cluster with 2 processes to make sure there are both local and remote blocks sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2) @@ -157,6 +164,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC } test("sorting on mutable pairs") { + assume(!Utils.isWindows) // This is not in SortingSuite because of the local cluster setup. // Use a local cluster with 2 processes to make sure there are both local and remote blocks sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) @@ -172,6 +180,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC } test("cogroup using mutable pairs") { + assume(!Utils.isWindows) // Use a local cluster with 2 processes to make sure there are both local and remote blocks sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2) @@ -199,6 +208,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC } test("subtract mutable pairs") { + assume(!Utils.isWindows) // Use a local cluster with 2 processes to make sure there are both local and remote blocks sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2) @@ -213,6 +223,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC } test("sort with Java non serializable class - Kryo") { + assume(!Utils.isWindows) // Use a local cluster with 2 processes to make sure there are both local and remote blocks val myConf = conf.clone().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sc = new SparkContext("local-cluster[2,1,1024]", "test", myConf) @@ -227,6 +238,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC } test("sort with Java non serializable class - Java") { + assume(!Utils.isWindows) // Use a local cluster with 2 processes to make sure there are both local and remote blocks sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) val a = sc.parallelize(1 to 10, 2) diff --git a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala index c9b3d657c2b9d3e516ac1169c3550fabba71b871..f50cb38311db22e40c92d8d4e3a877ca7d643030 100644 --- a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala @@ -142,7 +142,7 @@ private[deploy] object IvyTestUtils { |} """.stripMargin val sourceFile = - new JavaSourceFromString(new File(dir, className).getAbsolutePath, contents) + new JavaSourceFromString(new File(dir, className).toURI.getPath, contents) createCompiledClass(className, dir, sourceFile, Seq.empty) } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 626888022903bdbb0429ea061220fe662ed0d090..9417930d02405131593ec51899b5963a7951072b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -461,7 +461,7 @@ class SparkSubmitSuite val tempDir = Utils.createTempDir() val srcDir = new File(tempDir, "sparkrtest") srcDir.mkdirs() - val excSource = new JavaSourceFromString(new File(srcDir, "DummyClass").getAbsolutePath, + val excSource = new JavaSourceFromString(new File(srcDir, "DummyClass").toURI.getPath, """package sparkrtest; | |public class DummyClass implements java.io.Serializable { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index ee95e4ff7dbc343de50ad8811aa603cfe5c84b76..c9e682f53c105d77806de9dd92b4ca6969a7292f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -171,7 +171,7 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local val tempDir = Utils.createTempDir() val srcDir = new File(tempDir, "repro/") srcDir.mkdirs() - val excSource = new JavaSourceFromString(new File(srcDir, "MyException").getAbsolutePath, + val excSource = new JavaSourceFromString(new File(srcDir, "MyException").toURI.getPath, """package repro; | |public class MyException extends Exception { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala index 7c4f763322e4b7b1baf21fefdeb4e4b266d391f0..07839359a019c0f720c677b76d5ff85362b4ce3b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala @@ -86,7 +86,7 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { plan } - // This tests here are failed on Windows due to the failure of initiating executors + // The tests here are failed on Windows due to the failure of initiating executors // by the path length limitation. See SPARK-18718. test("unsafe broadcast hash join updates peak execution memory") { assume(!Utils.isWindows)