From 4a9913d66a61ac9ef9cab0e08f6151dc2624fd11 Mon Sep 17 00:00:00 2001
From: Gavin Li <lyo.gavin@gmail.com>
Date: Sun, 2 Jun 2013 23:21:09 +0000
Subject: [PATCH] add ut for pipe enhancement

---
 core/src/test/scala/spark/PipedRDDSuite.scala | 31 +++++++++++++++++++
 1 file changed, 31 insertions(+)

diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala
index a6344edf8f..ee55952a94 100644
--- a/core/src/test/scala/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/spark/PipedRDDSuite.scala
@@ -19,6 +19,37 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext {
     assert(c(3) === "4")
   }
 
+  test("advanced pipe") {
+    sc = new SparkContext("local", "test")
+    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+
+    val piped = nums.pipe(Seq("cat"), (i:Int, f: String=> Unit) => f(i + "_"), Array("0"))
+
+    val c = piped.collect()
+
+    assert(c.size === 8)
+    assert(c(0) === "0")
+    assert(c(1) === "\u0001")
+    assert(c(2) === "1_")
+    assert(c(3) === "2_")
+    assert(c(4) === "0")
+    assert(c(5) === "\u0001")
+    assert(c(6) === "3_")
+    assert(c(7) === "4_")
+
+    val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
+    val d = nums1.groupBy(str=>str.split("\t")(0)).pipe(Seq("cat"), (i:Tuple2[String, Seq[String]], f: String=> Unit) => {for (e <- i._2){ f(e + "_")}}, Array("0")).collect()
+    assert(d.size === 8)
+    assert(d(0) === "0")
+    assert(d(1) === "\u0001")
+    assert(d(2) === "b\t2_")
+    assert(d(3) === "b\t4_")
+    assert(d(4) === "0")
+    assert(d(5) === "\u0001")
+    assert(d(6) === "a\t1_")
+    assert(d(7) === "a\t3_")
+  }
+
   test("pipe with env variable") {
     sc = new SparkContext("local", "test")
     val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
-- 
GitLab