Skip to content
Snippets Groups Projects
Commit 4a9913d6 authored by Gavin Li's avatar Gavin Li
Browse files

add ut for pipe enhancement

parent 9f84315c
No related branches found
No related tags found
No related merge requests found
...@@ -19,6 +19,37 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext { ...@@ -19,6 +19,37 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext {
assert(c(3) === "4") assert(c(3) === "4")
} }
test("advanced pipe") {
sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val piped = nums.pipe(Seq("cat"), (i:Int, f: String=> Unit) => f(i + "_"), Array("0"))
val c = piped.collect()
assert(c.size === 8)
assert(c(0) === "0")
assert(c(1) === "\u0001")
assert(c(2) === "1_")
assert(c(3) === "2_")
assert(c(4) === "0")
assert(c(5) === "\u0001")
assert(c(6) === "3_")
assert(c(7) === "4_")
val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
val d = nums1.groupBy(str=>str.split("\t")(0)).pipe(Seq("cat"), (i:Tuple2[String, Seq[String]], f: String=> Unit) => {for (e <- i._2){ f(e + "_")}}, Array("0")).collect()
assert(d.size === 8)
assert(d(0) === "0")
assert(d(1) === "\u0001")
assert(d(2) === "b\t2_")
assert(d(3) === "b\t4_")
assert(d(4) === "0")
assert(d(5) === "\u0001")
assert(d(6) === "a\t1_")
assert(d(7) === "a\t3_")
}
test("pipe with env variable") { test("pipe with env variable") {
sc = new SparkContext("local", "test") sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment