Skip to content
Snippets Groups Projects
Commit 0bd76e87 authored by Liwei Lin's avatar Liwei Lin Committed by Reynold Xin
Browse files

[SPARK-16620][CORE] Add back the tokenization process in `RDD.pipe(command: String)`

## What changes were proposed in this pull request?

Currently `RDD.pipe(command: String)`:
- works only when the command is specified without any options, such as `RDD.pipe("wc")`
- does NOT work when the command is specified with some options, such as `RDD.pipe("wc -l")`

This is a regression from Spark 1.6.

This patch adds back the tokenization process in `RDD.pipe(command: String)` to fix this regression.

## How was this patch tested?
Added a test which:
- would pass in `1.6`
- _[prior to this patch]_ would fail in `master`
- _[after this patch]_ would pass in `master`

Author: Liwei Lin <lwlin7@gmail.com>

Closes #14256 from lw-lin/rdd-pipe.
parent 67089149
No related branches found
No related tags found
No related merge requests found
......@@ -699,14 +699,18 @@ abstract class RDD[T: ClassTag](
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: String): RDD[String] = withScope {
pipe(command)
// Similar to Runtime.exec(), if we are given a single string, split it into words
// using a standard StringTokenizer (i.e. by spaces)
pipe(PipedRDD.tokenize(command))
}
/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: String, env: Map[String, String]): RDD[String] = withScope {
pipe(command, env)
// Similar to Runtime.exec(), if we are given a single string, split it into words
// using a standard StringTokenizer (i.e. by spaces)
pipe(PipedRDD.tokenize(command), env)
}
/**
......
......@@ -51,6 +51,22 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
}
}
test("basic pipe with tokenization") {
if (testCommandAvailable("wc")) {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
// verify that both RDD.pipe(command: String) and RDD.pipe(command: String, env) work good
for (piped <- Seq(nums.pipe("wc -l"), nums.pipe("wc -l", Map[String, String]()))) {
val c = piped.collect()
assert(c.size === 2)
assert(c(0).trim === "2")
assert(c(1).trim === "2")
}
} else {
assert(true)
}
}
test("failure in iterating over pipe input") {
if (testCommandAvailable("cat")) {
val nums =
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment