Skip to content
Snippets Groups Projects
Unverified Commit 169b9d73 authored by hyukjinkwon's avatar hyukjinkwon Committed by Sean Owen
Browse files

[SPARK-18830][TESTS] Fix tests in PipedRDDSuite to pass on Windows

## What changes were proposed in this pull request?

This PR proposes to fix the tests failed on Windows as below:

```
[info] - pipe with empty partition *** FAILED *** (672 milliseconds)
[info]   Set(0, 4, 5) did not equal Set(0, 5, 6) (PipedRDDSuite.scala:145)
[info]   org.scalatest.exceptions.TestFailedException:
...
```

In this case, `wc -c` counts the characters on both Windows and Linux but the newlines characters on Windows are `\r\n` which are two. So, the counts ends up one more for each.

```
[info] - test pipe exports map_input_file *** FAILED *** (62 milliseconds)
[info]   java.lang.IllegalStateException: Subprocess exited with status 1. Command ran: printenv map_input_file
[info]   at org.apache.spark.rdd.PipedRDD$$anon$1.hasNext(PipedRDD.scala:178)
...
```

```
[info] - test pipe exports mapreduce_map_input_file *** FAILED *** (172 milliseconds)
[info]   java.lang.IllegalStateException: Subprocess exited with status 1. Command ran: printenv mapreduce_map_input_file
[info]   at org.apache.spark.rdd.PipedRDD$$anon$1.hasNext(PipedRDD.scala:178)
...
```

`printenv` command prints the environment variables; however, when environment variables are set to `ProcessBuilder` as lower-cased keys, `printenv` in Windows ignores and does not print this although it is actually set and accessible. (this was tested in [here](https://ci.appveyor.com/project/spark-test/spark/build/208-PipedRDDSuite) for upper-cases with this [diff](https://github.com/apache/spark/compare/master...spark-test:74d39da) and [here](https://ci.appveyor.com/project/spark-test/spark/build/203-PipedRDDSuite) for lower-cases with this [diff](https://github.com/apache/spark/compare/master...spark-test:fde5e37f28032c15a8d8693ba033a8a779a26317). It seems a bug in `printenv`.
(BTW, note that environment variables on Windows are case-insensitive).

This is (I believe) a thirdparty tool on Windows that resembles `printenv` on Linux (installed in AppVeyor environment or Windows Server 2012 R2). This command does not exist, at least, for Windows 7 and 10 (manually tested).

On Windows, we can use `cmd.exe /C set [varname]` officially for this purpose. We could fix the tests with this in order to test if the environment variable is set.

## How was this patch tested?

Manually tested via AppVeyor.

**Before**
https://ci.appveyor.com/project/spark-test/spark/build/194-PipedRDDSuite

**After**
https://ci.appveyor.com/project/spark-test/spark/build/226-PipedRDDSuite

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #16254 from HyukjinKwon/pipe-errors.
parent c6b8eb71
No related branches found
No related tags found
No related merge requests found
......@@ -32,109 +32,104 @@ import org.apache.spark._
import org.apache.spark.util.Utils
class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
val envCommand = if (Utils.isWindows) {
"cmd.exe /C set"
} else {
"printenv"
}
test("basic pipe") {
if (testCommandAvailable("cat")) {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
assume(testCommandAvailable("cat"))
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val piped = nums.pipe(Seq("cat"))
val piped = nums.pipe(Seq("cat"))
val c = piped.collect()
assert(c.size === 4)
assert(c(0) === "1")
assert(c(1) === "2")
assert(c(2) === "3")
assert(c(3) === "4")
} else {
assert(true)
}
val c = piped.collect()
assert(c.size === 4)
assert(c(0) === "1")
assert(c(1) === "2")
assert(c(2) === "3")
assert(c(3) === "4")
}
test("basic pipe with tokenization") {
if (testCommandAvailable("wc")) {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
// verify that both RDD.pipe(command: String) and RDD.pipe(command: String, env) work good
for (piped <- Seq(nums.pipe("wc -l"), nums.pipe("wc -l", Map[String, String]()))) {
val c = piped.collect()
assert(c.size === 2)
assert(c(0).trim === "2")
assert(c(1).trim === "2")
}
} else {
assert(true)
assume(testCommandAvailable("wc"))
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
// verify that both RDD.pipe(command: String) and RDD.pipe(command: String, env) work good
for (piped <- Seq(nums.pipe("wc -l"), nums.pipe("wc -l", Map[String, String]()))) {
val c = piped.collect()
assert(c.size === 2)
assert(c(0).trim === "2")
assert(c(1).trim === "2")
}
}
test("failure in iterating over pipe input") {
if (testCommandAvailable("cat")) {
val nums =
sc.makeRDD(Array(1, 2, 3, 4), 2)
.mapPartitionsWithIndex((index, iterator) => {
new Iterator[Int] {
def hasNext = true
def next() = {
throw new SparkException("Exception to simulate bad scenario")
}
}
})
val piped = nums.pipe(Seq("cat"))
intercept[SparkException] {
piped.collect()
}
assume(testCommandAvailable("cat"))
val nums =
sc.makeRDD(Array(1, 2, 3, 4), 2)
.mapPartitionsWithIndex((index, iterator) => {
new Iterator[Int] {
def hasNext = true
def next() = {
throw new SparkException("Exception to simulate bad scenario")
}
}
})
val piped = nums.pipe(Seq("cat"))
intercept[SparkException] {
piped.collect()
}
}
test("advanced pipe") {
if (testCommandAvailable("cat")) {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val bl = sc.broadcast(List("0"))
val piped = nums.pipe(Seq("cat"),
assume(testCommandAvailable("cat"))
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val bl = sc.broadcast(List("0"))
val piped = nums.pipe(Seq("cat"),
Map[String, String](),
(f: String => Unit) => {
bl.value.foreach(f); f("\u0001")
},
(i: Int, f: String => Unit) => f(i + "_"))
val c = piped.collect()
assert(c.size === 8)
assert(c(0) === "0")
assert(c(1) === "\u0001")
assert(c(2) === "1_")
assert(c(3) === "2_")
assert(c(4) === "0")
assert(c(5) === "\u0001")
assert(c(6) === "3_")
assert(c(7) === "4_")
val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
val d = nums1.groupBy(str => str.split("\t")(0)).
pipe(Seq("cat"),
Map[String, String](),
(f: String => Unit) => {
bl.value.foreach(f); f("\u0001")
},
(i: Int, f: String => Unit) => f(i + "_"))
val c = piped.collect()
assert(c.size === 8)
assert(c(0) === "0")
assert(c(1) === "\u0001")
assert(c(2) === "1_")
assert(c(3) === "2_")
assert(c(4) === "0")
assert(c(5) === "\u0001")
assert(c(6) === "3_")
assert(c(7) === "4_")
val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
val d = nums1.groupBy(str => str.split("\t")(0)).
pipe(Seq("cat"),
Map[String, String](),
(f: String => Unit) => {
bl.value.foreach(f); f("\u0001")
},
(i: Tuple2[String, Iterable[String]], f: String => Unit) => {
for (e <- i._2) {
f(e + "_")
}
}).collect()
assert(d.size === 8)
assert(d(0) === "0")
assert(d(1) === "\u0001")
assert(d(2) === "b\t2_")
assert(d(3) === "b\t4_")
assert(d(4) === "0")
assert(d(5) === "\u0001")
assert(d(6) === "a\t1_")
assert(d(7) === "a\t3_")
} else {
assert(true)
}
(i: Tuple2[String, Iterable[String]], f: String => Unit) => {
for (e <- i._2) {
f(e + "_")
}
}).collect()
assert(d.size === 8)
assert(d(0) === "0")
assert(d(1) === "\u0001")
assert(d(2) === "b\t2_")
assert(d(3) === "b\t4_")
assert(d(4) === "0")
assert(d(5) === "\u0001")
assert(d(6) === "a\t1_")
assert(d(7) === "a\t3_")
}
test("pipe with empty partition") {
......@@ -142,67 +137,67 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
val piped = data.pipe("wc -c")
assert(piped.count == 8)
val charCounts = piped.map(_.trim.toInt).collect().toSet
assert(Set(0, 4, 5) == charCounts)
val expected = if (Utils.isWindows) {
// Note that newline character on Windows is \r\n which are two.
Set(0, 5, 6)
} else {
Set(0, 4, 5)
}
assert(expected == charCounts)
}
test("pipe with env variable") {
if (testCommandAvailable("printenv")) {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val piped = nums.pipe(Seq("printenv", "MY_TEST_ENV"), Map("MY_TEST_ENV" -> "LALALA"))
val c = piped.collect()
assert(c.size === 2)
assert(c(0) === "LALALA")
assert(c(1) === "LALALA")
} else {
assert(true)
}
assume(testCommandAvailable(envCommand))
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val piped = nums.pipe(s"$envCommand MY_TEST_ENV", Map("MY_TEST_ENV" -> "LALALA"))
val c = piped.collect()
assert(c.length === 2)
// On Windows, `cmd.exe /C set` is used which prints out it as `varname=value` format
// whereas `printenv` usually prints out `value`. So, `varname=` is stripped here for both.
assert(c(0).stripPrefix("MY_TEST_ENV=") === "LALALA")
assert(c(1).stripPrefix("MY_TEST_ENV=") === "LALALA")
}
test("pipe with process which cannot be launched due to bad command") {
if (!testCommandAvailable("some_nonexistent_command")) {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val command = Seq("some_nonexistent_command")
val piped = nums.pipe(command)
val exception = intercept[SparkException] {
piped.collect()
}
assert(exception.getMessage.contains(command.mkString(" ")))
assume(!testCommandAvailable("some_nonexistent_command"))
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val command = Seq("some_nonexistent_command")
val piped = nums.pipe(command)
val exception = intercept[SparkException] {
piped.collect()
}
assert(exception.getMessage.contains(command.mkString(" ")))
}
test("pipe with process which is launched but fails with non-zero exit status") {
if (testCommandAvailable("cat")) {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val command = Seq("cat", "nonexistent_file")
val piped = nums.pipe(command)
val exception = intercept[SparkException] {
piped.collect()
}
assert(exception.getMessage.contains(command.mkString(" ")))
assume(testCommandAvailable("cat"))
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val command = Seq("cat", "nonexistent_file")
val piped = nums.pipe(command)
val exception = intercept[SparkException] {
piped.collect()
}
assert(exception.getMessage.contains(command.mkString(" ")))
}
test("basic pipe with separate working directory") {
if (testCommandAvailable("cat")) {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val piped = nums.pipe(Seq("cat"), separateWorkingDir = true)
val c = piped.collect()
assert(c.size === 4)
assert(c(0) === "1")
assert(c(1) === "2")
assert(c(2) === "3")
assert(c(3) === "4")
val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true)
val collectPwd = pipedPwd.collect()
assert(collectPwd(0).contains("tasks/"))
val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true, bufferSize = 16384).collect()
// make sure symlinks were created
assert(pipedLs.length > 0)
// clean up top level tasks directory
Utils.deleteRecursively(new File("tasks"))
} else {
assert(true)
}
assume(testCommandAvailable("cat"))
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val piped = nums.pipe(Seq("cat"), separateWorkingDir = true)
val c = piped.collect()
assert(c.size === 4)
assert(c(0) === "1")
assert(c(1) === "2")
assert(c(2) === "3")
assert(c(3) === "4")
val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true)
val collectPwd = pipedPwd.collect()
assert(collectPwd(0).contains("tasks/"))
val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true, bufferSize = 16384).collect()
// make sure symlinks were created
assert(pipedLs.length > 0)
// clean up top level tasks directory
Utils.deleteRecursively(new File("tasks"))
}
test("test pipe exports map_input_file") {
......@@ -219,36 +214,35 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
}
def testExportInputFile(varName: String) {
if (testCommandAvailable("printenv")) {
val nums = new HadoopRDD(sc, new JobConf(), classOf[TextInputFormat], classOf[LongWritable],
classOf[Text], 2) {
override def getPartitions: Array[Partition] = Array(generateFakeHadoopPartition())
assume(testCommandAvailable(envCommand))
val nums = new HadoopRDD(sc, new JobConf(), classOf[TextInputFormat], classOf[LongWritable],
classOf[Text], 2) {
override def getPartitions: Array[Partition] = Array(generateFakeHadoopPartition())
override val getDependencies = List[Dependency[_]]()
override val getDependencies = List[Dependency[_]]()
override def compute(theSplit: Partition, context: TaskContext) = {
new InterruptibleIterator[(LongWritable, Text)](context, Iterator((new LongWritable(1),
new Text("b"))))
}
override def compute(theSplit: Partition, context: TaskContext) = {
new InterruptibleIterator[(LongWritable, Text)](context, Iterator((new LongWritable(1),
new Text("b"))))
}
val hadoopPart1 = generateFakeHadoopPartition()
val pipedRdd =
new PipedRDD(
nums,
PipedRDD.tokenize("printenv " + varName),
Map(),
null,
null,
false,
4092,
Codec.defaultCharsetCodec.name)
val tContext = TaskContext.empty()
val rddIter = pipedRdd.compute(hadoopPart1, tContext)
val arr = rddIter.toArray
assert(arr(0) == "/some/path")
} else {
// printenv isn't available so just pass the test
}
val hadoopPart1 = generateFakeHadoopPartition()
val pipedRdd =
new PipedRDD(
nums,
PipedRDD.tokenize(s"$envCommand $varName"),
Map(),
null,
null,
false,
4092,
Codec.defaultCharsetCodec.name)
val tContext = TaskContext.empty()
val rddIter = pipedRdd.compute(hadoopPart1, tContext)
val arr = rddIter.toArray
// On Windows, `cmd.exe /C set` is used which prints out it as `varname=value` format
// whereas `printenv` usually prints out `value`. So, `varname=` is stripped here for both.
assert(arr(0).stripPrefix(s"$varName=") === "/some/path")
}
def generateFakeHadoopPartition(): HadoopPartition = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment