Skip to content
Snippets Groups Projects
Unverified Commit e094d011 authored by hyukjinkwon's avatar hyukjinkwon Committed by Sean Owen
Browse files

[SPARK-18803][TESTS] Fix JarEntry-related & path-related test failures and...

[SPARK-18803][TESTS] Fix JarEntry-related & path-related test failures and skip some tests by path length limitation on Windows

## What changes were proposed in this pull request?

This PR proposes to fix some tests being failed on Windows as below for several problems.

### Incorrect path handling

- FileSuite
  ```
  [info] - binary file input as byte array *** FAILED *** (500 milliseconds)
  [info]   "file:/C:/projects/spark/target/tmp/spark-e7c3a3b8-0a4b-4a7f-9ebe-7c4883e48624/record-bytestream-00000.bin" did not contain "C:\projects\spark\target\tmp\spark-e7c3a3b8-0a4b-4a7f-9ebe-7c4883e48624\record-bytestream-00000.bin" (FileSuite.scala:258)
  [info]   org.scalatest.exceptions.TestFailedException:
  [info]   at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500)
  ...
  ```
  ```
  [info] - Get input files via old Hadoop API *** FAILED *** (1 second, 94 milliseconds)
  [info]   Set("/C:/projects/spark/target/tmp/spark-cf5b1f8b-c5ed-43e0-8d17-546ebbfa8200/output/part-00000", "/C:/projects/spark/target/tmp/spark-cf5b1f8b-c5ed-43e0-8d17-546ebbfa8200/output/part-00001") did not equal Set("C:\projects\spark\target\tmp\spark-cf5b1f8b-c5ed-43e0-8d17-546ebbfa8200\output/part-00000", "C:\projects\spark\target\tmp\spark-cf5b1f8b-c5ed-43e0-8d17-546ebbfa8200\output/part-00001") (FileSuite.scala:535)
  [info]   org.scalatest.exceptions.TestFailedException:
  [info]   at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500)
  ...
  ```

  ```
  [info] - Get input files via new Hadoop API *** FAILED *** (313 milliseconds)
  [info]   Set("/C:/projects/spark/target/tmp/spark-12bc1540-1111-4df6-9c4d-79e0e614407c/output/part-00000", "/C:/projects/spark/target/tmp/spark-12bc1540-1111-4df6-9c4d-79e0e614407c/output/part-00001") did not equal Set("C:\projects\spark\target\tmp\spark-12bc1540-1111-4df6-9c4d-79e0e614407c\output/part-00000", "C:\projects\spark\target\tmp\spark-12bc1540-1111-4df6-9c4d-79e0e614407c\output/part-00001") (FileSuite.scala:549)
  [info]   org.scalatest.exceptions.TestFailedException:
  ...
  ```

- TaskResultGetterSuite

  ```
  [info] - handling results larger than max RPC message size *** FAILED *** (1 second, 579 milliseconds)
  [info]   1 did not equal 0 Expect result to be removed from the block manager. (TaskResultGetterSuite.scala:129)
  [info]   org.scalatest.exceptions.TestFailedException:
  [info]   ...
  [info]   Cause: java.net.URISyntaxException: Illegal character in path at index 12: string:///C:\projects\spark\target\tmp\spark-93c485af-68da-440f-a907-aac7acd5fc25\repro\MyException.java
  [info]   at java.net.URI$Parser.fail(URI.java:2848)
  [info]   at java.net.URI$Parser.checkChars(URI.java:3021)
  ...
  ```
  ```
  [info] - failed task deserialized with the correct classloader (SPARK-11195) *** FAILED *** (0 milliseconds)
  [info]   java.lang.IllegalArgumentException: Illegal character in path at index 12: string:///C:\projects\spark\target\tmp\spark-93c485af-68da-440f-a907-aac7acd5fc25\repro\MyException.java
  [info]   at java.net.URI.create(URI.java:852)
  ...
  ```

- SparkSubmitSuite

  ```
  [info]   java.lang.IllegalArgumentException: Illegal character in path at index 12: string:///C:\projects\spark\target\tmp\1481210831381-0\870903339\MyLib.java
  [info]   at java.net.URI.create(URI.java:852)
  [info]   at org.apache.spark.TestUtils$.org$apache$spark$TestUtils$$createURI(TestUtils.scala:112)
  ...
  ```

### Incorrect separate for JarEntry

After the path fix from above, then `TaskResultGetterSuite` throws another exception as below:

```
[info] - failed task deserialized with the correct classloader (SPARK-11195) *** FAILED *** (907 milliseconds)
[info]   java.lang.ClassNotFoundException: repro.MyException
[info]   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
...
```

This is because `Paths.get` concatenates the given paths to an OS-specific path (Windows `\` and Linux `/`). However, for `JarEntry` we should comply ZIP specification meaning it should be always `/` according to ZIP specification.

See `4.4.17 file name: (Variable)` in https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT

### Long path problem on Windows

Some tests in `ShuffleSuite` via `ShuffleNettySuite` were skipped due to the same reason with SPARK-18718

## How was this patch tested?

Manually via AppVeyor.

**Before**

- `FileSuite`, `TaskResultGetterSuite`,`SparkSubmitSuite`
  https://ci.appveyor.com/project/spark-test/spark/build/164-tmp-windows-base (please grep each to check each)
- `ShuffleSuite`
  https://ci.appveyor.com/project/spark-test/spark/build/157-tmp-windows-base

**After**

- `FileSuite`
  https://ci.appveyor.com/project/spark-test/spark/build/166-FileSuite
- `TaskResultGetterSuite`
  https://ci.appveyor.com/project/spark-test/spark/build/173-TaskResultGetterSuite
- `SparkSubmitSuite`
  https://ci.appveyor.com/project/spark-test/spark/build/167-SparkSubmitSuite
- `ShuffleSuite`
  https://ci.appveyor.com/project/spark-test/spark/build/176-ShuffleSuite

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #16234 from HyukjinKwon/test-errors-windows.
parent 11432483
No related branches found
No related tags found
No related merge requests found
......@@ -93,7 +93,10 @@ private[spark] object TestUtils {
val jarStream = new JarOutputStream(jarFileStream, new java.util.jar.Manifest())
for (file <- files) {
val jarEntry = new JarEntry(Paths.get(directoryPrefix.getOrElse(""), file.getName).toString)
// The `name` for the argument in `JarEntry` should use / for its separator. This is
// ZIP specification.
val prefix = directoryPrefix.map(d => s"$d/").getOrElse("")
val jarEntry = new JarEntry(prefix + file.getName)
jarStream.putNextEntry(jarEntry)
val in = new FileInputStream(file)
......
......@@ -22,6 +22,7 @@ import java.util.zip.GZIPOutputStream
import scala.io.Source
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress.DefaultCodec
import org.apache.hadoop.mapred.{FileAlreadyExistsException, FileSplit, JobConf, TextInputFormat, TextOutputFormat}
......@@ -255,7 +256,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
val (infile: String, indata: PortableDataStream) = inRdd.collect.head
// Make sure the name and array match
assert(infile.contains(outFileName)) // a prefix may get added
assert(infile.contains(outFile.toURI.getPath)) // a prefix may get added
assert(indata.toArray === testOutput)
}
......@@ -532,7 +533,9 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
.mapPartitionsWithInputSplit { (split, part) =>
Iterator(split.asInstanceOf[FileSplit].getPath.toUri.getPath)
}.collect()
assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001"))
val outPathOne = new Path(outDir, "part-00000").toUri.getPath
val outPathTwo = new Path(outDir, "part-00001").toUri.getPath
assert(inputPaths.toSet === Set(outPathOne, outPathTwo))
}
test("Get input files via new Hadoop API") {
......@@ -546,7 +549,9 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
.mapPartitionsWithInputSplit { (split, part) =>
Iterator(split.asInstanceOf[NewFileSplit].getPath.toUri.getPath)
}.collect()
assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001"))
val outPathOne = new Path(outDir, "part-00000").toUri.getPath
val outPathTwo = new Path(outDir, "part-00001").toUri.getPath
assert(inputPaths.toSet === Set(outPathOne, outPathTwo))
}
test("spark.files.ignoreCorruptFiles should work both HadoopRDD and NewHadoopRDD") {
......
......@@ -29,7 +29,7 @@ import org.apache.spark.scheduler.{MapStatus, MyRDD, SparkListener, SparkListene
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.shuffle.ShuffleWriter
import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId}
import org.apache.spark.util.MutablePair
import org.apache.spark.util.{MutablePair, Utils}
abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkContext {
......@@ -51,7 +51,10 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
assert(valuesFor2.toList.sorted === List(1))
}
// Some tests using `local-cluster` here are failed on Windows due to the failure of initiating
// executors by the path length limitation. See SPARK-18718.
test("shuffle non-zero block size") {
assume(!Utils.isWindows)
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
val NUM_BLOCKS = 3
......@@ -77,6 +80,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}
test("shuffle serializer") {
assume(!Utils.isWindows)
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
val a = sc.parallelize(1 to 10, 2)
......@@ -93,6 +97,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}
test("zero sized blocks") {
assume(!Utils.isWindows)
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
......@@ -120,6 +125,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}
test("zero sized blocks without kryo") {
assume(!Utils.isWindows)
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
......@@ -145,6 +151,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}
test("shuffle on mutable pairs") {
assume(!Utils.isWindows)
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
......@@ -157,6 +164,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}
test("sorting on mutable pairs") {
assume(!Utils.isWindows)
// This is not in SortingSuite because of the local cluster setup.
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
......@@ -172,6 +180,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}
test("cogroup using mutable pairs") {
assume(!Utils.isWindows)
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
......@@ -199,6 +208,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}
test("subtract mutable pairs") {
assume(!Utils.isWindows)
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
......@@ -213,6 +223,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}
test("sort with Java non serializable class - Kryo") {
assume(!Utils.isWindows)
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
val myConf = conf.clone().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sc = new SparkContext("local-cluster[2,1,1024]", "test", myConf)
......@@ -227,6 +238,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}
test("sort with Java non serializable class - Java") {
assume(!Utils.isWindows)
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
val a = sc.parallelize(1 to 10, 2)
......
......@@ -142,7 +142,7 @@ private[deploy] object IvyTestUtils {
|}
""".stripMargin
val sourceFile =
new JavaSourceFromString(new File(dir, className).getAbsolutePath, contents)
new JavaSourceFromString(new File(dir, className).toURI.getPath, contents)
createCompiledClass(className, dir, sourceFile, Seq.empty)
}
......
......@@ -461,7 +461,7 @@ class SparkSubmitSuite
val tempDir = Utils.createTempDir()
val srcDir = new File(tempDir, "sparkrtest")
srcDir.mkdirs()
val excSource = new JavaSourceFromString(new File(srcDir, "DummyClass").getAbsolutePath,
val excSource = new JavaSourceFromString(new File(srcDir, "DummyClass").toURI.getPath,
"""package sparkrtest;
|
|public class DummyClass implements java.io.Serializable {
......
......@@ -171,7 +171,7 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
val tempDir = Utils.createTempDir()
val srcDir = new File(tempDir, "repro/")
srcDir.mkdirs()
val excSource = new JavaSourceFromString(new File(srcDir, "MyException").getAbsolutePath,
val excSource = new JavaSourceFromString(new File(srcDir, "MyException").toURI.getPath,
"""package repro;
|
|public class MyException extends Exception {
......
......@@ -86,7 +86,7 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
plan
}
// This tests here are failed on Windows due to the failure of initiating executors
// The tests here are failed on Windows due to the failure of initiating executors
// by the path length limitation. See SPARK-18718.
test("unsafe broadcast hash join updates peak execution memory") {
assume(!Utils.isWindows)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment