diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
index 7293aa9a2584fe842fa0aaab6295a449db58b5d2..287ae6ff6e28d543accf80edc73504761625f6df 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
@@ -32,109 +32,104 @@ import org.apache.spark._
 import org.apache.spark.util.Utils
 
 class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
+  val envCommand = if (Utils.isWindows) {
+    "cmd.exe /C set"
+  } else {
+    "printenv"
+  }
 
   test("basic pipe") {
-    if (testCommandAvailable("cat")) {
-      val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+    assume(testCommandAvailable("cat"))
+    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
 
-      val piped = nums.pipe(Seq("cat"))
+    val piped = nums.pipe(Seq("cat"))
 
-      val c = piped.collect()
-      assert(c.size === 4)
-      assert(c(0) === "1")
-      assert(c(1) === "2")
-      assert(c(2) === "3")
-      assert(c(3) === "4")
-    } else {
-      assert(true)
-    }
+    val c = piped.collect()
+    assert(c.size === 4)
+    assert(c(0) === "1")
+    assert(c(1) === "2")
+    assert(c(2) === "3")
+    assert(c(3) === "4")
   }
 
   test("basic pipe with tokenization") {
-    if (testCommandAvailable("wc")) {
-      val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
-
-      // verify that both RDD.pipe(command: String) and RDD.pipe(command: String, env) work good
-      for (piped <- Seq(nums.pipe("wc -l"), nums.pipe("wc -l", Map[String, String]()))) {
-        val c = piped.collect()
-        assert(c.size === 2)
-        assert(c(0).trim === "2")
-        assert(c(1).trim === "2")
-      }
-    } else {
-      assert(true)
+    assume(testCommandAvailable("wc"))
+    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+
+    // verify that both RDD.pipe(command: String) and RDD.pipe(command: String, env) work good
+    for (piped <- Seq(nums.pipe("wc -l"), nums.pipe("wc -l", Map[String, String]()))) {
+      val c = piped.collect()
+      assert(c.size === 2)
+      assert(c(0).trim === "2")
+      assert(c(1).trim === "2")
     }
   }
 
   test("failure in iterating over pipe input") {
-    if (testCommandAvailable("cat")) {
-      val nums =
-        sc.makeRDD(Array(1, 2, 3, 4), 2)
-          .mapPartitionsWithIndex((index, iterator) => {
-            new Iterator[Int] {
-              def hasNext = true
-              def next() = {
-                throw new SparkException("Exception to simulate bad scenario")
-              }
-            }
-          })
-
-      val piped = nums.pipe(Seq("cat"))
-
-      intercept[SparkException] {
-        piped.collect()
-      }
+    assume(testCommandAvailable("cat"))
+    val nums =
+      sc.makeRDD(Array(1, 2, 3, 4), 2)
+        .mapPartitionsWithIndex((index, iterator) => {
+        new Iterator[Int] {
+          def hasNext = true
+          def next() = {
+            throw new SparkException("Exception to simulate bad scenario")
+          }
+        }
+      })
+
+    val piped = nums.pipe(Seq("cat"))
+
+    intercept[SparkException] {
+      piped.collect()
     }
   }
 
   test("advanced pipe") {
-    if (testCommandAvailable("cat")) {
-      val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
-      val bl = sc.broadcast(List("0"))
-
-      val piped = nums.pipe(Seq("cat"),
+    assume(testCommandAvailable("cat"))
+    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+    val bl = sc.broadcast(List("0"))
+
+    val piped = nums.pipe(Seq("cat"),
+      Map[String, String](),
+      (f: String => Unit) => {
+        bl.value.foreach(f); f("\u0001")
+      },
+      (i: Int, f: String => Unit) => f(i + "_"))
+
+    val c = piped.collect()
+
+    assert(c.size === 8)
+    assert(c(0) === "0")
+    assert(c(1) === "\u0001")
+    assert(c(2) === "1_")
+    assert(c(3) === "2_")
+    assert(c(4) === "0")
+    assert(c(5) === "\u0001")
+    assert(c(6) === "3_")
+    assert(c(7) === "4_")
+
+    val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
+    val d = nums1.groupBy(str => str.split("\t")(0)).
+      pipe(Seq("cat"),
         Map[String, String](),
         (f: String => Unit) => {
           bl.value.foreach(f); f("\u0001")
         },
-        (i: Int, f: String => Unit) => f(i + "_"))
-
-      val c = piped.collect()
-
-      assert(c.size === 8)
-      assert(c(0) === "0")
-      assert(c(1) === "\u0001")
-      assert(c(2) === "1_")
-      assert(c(3) === "2_")
-      assert(c(4) === "0")
-      assert(c(5) === "\u0001")
-      assert(c(6) === "3_")
-      assert(c(7) === "4_")
-
-      val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
-      val d = nums1.groupBy(str => str.split("\t")(0)).
-        pipe(Seq("cat"),
-          Map[String, String](),
-          (f: String => Unit) => {
-            bl.value.foreach(f); f("\u0001")
-          },
-          (i: Tuple2[String, Iterable[String]], f: String => Unit) => {
-            for (e <- i._2) {
-              f(e + "_")
-            }
-          }).collect()
-      assert(d.size === 8)
-      assert(d(0) === "0")
-      assert(d(1) === "\u0001")
-      assert(d(2) === "b\t2_")
-      assert(d(3) === "b\t4_")
-      assert(d(4) === "0")
-      assert(d(5) === "\u0001")
-      assert(d(6) === "a\t1_")
-      assert(d(7) === "a\t3_")
-    } else {
-      assert(true)
-    }
+        (i: Tuple2[String, Iterable[String]], f: String => Unit) => {
+          for (e <- i._2) {
+            f(e + "_")
+          }
+        }).collect()
+    assert(d.size === 8)
+    assert(d(0) === "0")
+    assert(d(1) === "\u0001")
+    assert(d(2) === "b\t2_")
+    assert(d(3) === "b\t4_")
+    assert(d(4) === "0")
+    assert(d(5) === "\u0001")
+    assert(d(6) === "a\t1_")
+    assert(d(7) === "a\t3_")
   }
 
   test("pipe with empty partition") {
@@ -142,67 +137,67 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
     val piped = data.pipe("wc -c")
     assert(piped.count == 8)
     val charCounts = piped.map(_.trim.toInt).collect().toSet
-    assert(Set(0, 4, 5) == charCounts)
+    val expected = if (Utils.isWindows) {
+      // Note that newline character on Windows is \r\n which are two.
+      Set(0, 5, 6)
+    } else {
+      Set(0, 4, 5)
+    }
+    assert(expected == charCounts)
   }
 
   test("pipe with env variable") {
-    if (testCommandAvailable("printenv")) {
-      val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
-      val piped = nums.pipe(Seq("printenv", "MY_TEST_ENV"), Map("MY_TEST_ENV" -> "LALALA"))
-      val c = piped.collect()
-      assert(c.size === 2)
-      assert(c(0) === "LALALA")
-      assert(c(1) === "LALALA")
-    } else {
-      assert(true)
-    }
+    assume(testCommandAvailable(envCommand))
+    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+    val piped = nums.pipe(s"$envCommand MY_TEST_ENV", Map("MY_TEST_ENV" -> "LALALA"))
+    val c = piped.collect()
+    assert(c.length === 2)
+    // On Windows, `cmd.exe /C set` is used which prints out it as `varname=value` format
+    // whereas `printenv` usually prints out `value`. So, `varname=` is stripped here for both.
+    assert(c(0).stripPrefix("MY_TEST_ENV=") === "LALALA")
+    assert(c(1).stripPrefix("MY_TEST_ENV=") === "LALALA")
   }
 
   test("pipe with process which cannot be launched due to bad command") {
-    if (!testCommandAvailable("some_nonexistent_command")) {
-      val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
-      val command = Seq("some_nonexistent_command")
-      val piped = nums.pipe(command)
-      val exception = intercept[SparkException] {
-        piped.collect()
-      }
-      assert(exception.getMessage.contains(command.mkString(" ")))
+    assume(!testCommandAvailable("some_nonexistent_command"))
+    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+    val command = Seq("some_nonexistent_command")
+    val piped = nums.pipe(command)
+    val exception = intercept[SparkException] {
+      piped.collect()
     }
+    assert(exception.getMessage.contains(command.mkString(" ")))
   }
 
   test("pipe with process which is launched but fails with non-zero exit status") {
-    if (testCommandAvailable("cat")) {
-      val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
-      val command = Seq("cat", "nonexistent_file")
-      val piped = nums.pipe(command)
-      val exception = intercept[SparkException] {
-        piped.collect()
-      }
-      assert(exception.getMessage.contains(command.mkString(" ")))
+    assume(testCommandAvailable("cat"))
+    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+    val command = Seq("cat", "nonexistent_file")
+    val piped = nums.pipe(command)
+    val exception = intercept[SparkException] {
+      piped.collect()
     }
+    assert(exception.getMessage.contains(command.mkString(" ")))
   }
 
   test("basic pipe with separate working directory") {
-    if (testCommandAvailable("cat")) {
-      val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
-      val piped = nums.pipe(Seq("cat"), separateWorkingDir = true)
-      val c = piped.collect()
-      assert(c.size === 4)
-      assert(c(0) === "1")
-      assert(c(1) === "2")
-      assert(c(2) === "3")
-      assert(c(3) === "4")
-      val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true)
-      val collectPwd = pipedPwd.collect()
-      assert(collectPwd(0).contains("tasks/"))
-      val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true, bufferSize = 16384).collect()
-      // make sure symlinks were created
-      assert(pipedLs.length > 0)
-      // clean up top level tasks directory
-      Utils.deleteRecursively(new File("tasks"))
-    } else {
-      assert(true)
-    }
+    assume(testCommandAvailable("cat"))
+    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+    val piped = nums.pipe(Seq("cat"), separateWorkingDir = true)
+    val c = piped.collect()
+    assert(c.size === 4)
+    assert(c(0) === "1")
+    assert(c(1) === "2")
+    assert(c(2) === "3")
+    assert(c(3) === "4")
+    val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true)
+    val collectPwd = pipedPwd.collect()
+    assert(collectPwd(0).contains("tasks/"))
+    val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true, bufferSize = 16384).collect()
+    // make sure symlinks were created
+    assert(pipedLs.length > 0)
+    // clean up top level tasks directory
+    Utils.deleteRecursively(new File("tasks"))
   }
 
   test("test pipe exports map_input_file") {
@@ -219,36 +214,35 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
   }
 
   def testExportInputFile(varName: String) {
-    if (testCommandAvailable("printenv")) {
-      val nums = new HadoopRDD(sc, new JobConf(), classOf[TextInputFormat], classOf[LongWritable],
-        classOf[Text], 2) {
-        override def getPartitions: Array[Partition] = Array(generateFakeHadoopPartition())
+    assume(testCommandAvailable(envCommand))
+    val nums = new HadoopRDD(sc, new JobConf(), classOf[TextInputFormat], classOf[LongWritable],
+      classOf[Text], 2) {
+      override def getPartitions: Array[Partition] = Array(generateFakeHadoopPartition())
 
-        override val getDependencies = List[Dependency[_]]()
+      override val getDependencies = List[Dependency[_]]()
 
-        override def compute(theSplit: Partition, context: TaskContext) = {
-          new InterruptibleIterator[(LongWritable, Text)](context, Iterator((new LongWritable(1),
-            new Text("b"))))
-        }
+      override def compute(theSplit: Partition, context: TaskContext) = {
+        new InterruptibleIterator[(LongWritable, Text)](context, Iterator((new LongWritable(1),
+          new Text("b"))))
       }
-      val hadoopPart1 = generateFakeHadoopPartition()
-      val pipedRdd =
-        new PipedRDD(
-          nums,
-          PipedRDD.tokenize("printenv " + varName),
-          Map(),
-          null,
-          null,
-          false,
-          4092,
-          Codec.defaultCharsetCodec.name)
-      val tContext = TaskContext.empty()
-      val rddIter = pipedRdd.compute(hadoopPart1, tContext)
-      val arr = rddIter.toArray
-      assert(arr(0) == "/some/path")
-    } else {
-      // printenv isn't available so just pass the test
     }
+    val hadoopPart1 = generateFakeHadoopPartition()
+    val pipedRdd =
+      new PipedRDD(
+        nums,
+        PipedRDD.tokenize(s"$envCommand $varName"),
+        Map(),
+        null,
+        null,
+        false,
+        4092,
+        Codec.defaultCharsetCodec.name)
+    val tContext = TaskContext.empty()
+    val rddIter = pipedRdd.compute(hadoopPart1, tContext)
+    val arr = rddIter.toArray
+    // On Windows, `cmd.exe /C set` is used which prints out it as `varname=value` format
+    // whereas `printenv` usually prints out `value`. So, `varname=` is stripped here for both.
+    assert(arr(0).stripPrefix(s"$varName=") === "/some/path")
   }
 
   def generateFakeHadoopPartition(): HadoopPartition = {