diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 7cb9bfebebc68cd73af3153c8a72f0ba9497108a..223dcdc19d877a67c08f181b3fd9c5ecb9ecddf3 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -14,6 +14,7 @@ import org.apache.hadoop.mapred.TextOutputFormat
 
 import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
 
+import spark.broadcast.Broadcast
 import spark.Partitioner._
 import spark.partial.BoundedDouble
 import spark.partial.CountEvaluator
@@ -354,13 +355,36 @@ abstract class RDD[T: ClassManifest](
   /**
    * Return an RDD created by piping elements to a forked external process.
    */
-  def pipe(command: Seq[String]): RDD[String] = new PipedRDD(this, command)
+  def pipe(command: String, env: Map[String, String]): RDD[String] = 
+    new PipedRDD(this, command, env)
+
 
   /**
    * Return an RDD created by piping elements to a forked external process.
-   */
-  def pipe(command: Seq[String], env: Map[String, String]): RDD[String] =
-    new PipedRDD(this, command, env)
+   * The print behavior can be customized by providing two functions.
+   *
+   * @param command command to run in forked process.
+   * @param env environment variables to set.
+   * @param printPipeContext Before piping elements, this function is called as an oppotunity
+   *                         to pipe context data. Print line function (like out.println) will be 
+   *                         passed as printPipeContext's parameter.
+   * @param printPipeContext Use this function to customize how to pipe elements. This function 
+   *                         will be called with each RDD element as the 1st parameter, and the 
+   *                         print line function (like out.println()) as the 2nd parameter.
+   *                         An example of pipe the RDD data of groupBy() in a streaming way,
+   *                         instead of constructing a huge String to concat all the elements:
+   *                         def printRDDElement(record:(String, Seq[String]), f:String=>Unit) = 
+   *                           for (e <- record._2){f(e)}
+   * @return the result RDD
+   */
+  def pipe(
+      command: Seq[String], 
+      env: Map[String, String] = Map(), 
+      printPipeContext: (String => Unit) => Unit = null,
+      printRDDElement: (T, String => Unit) => Unit = null): RDD[String] = 
+    new PipedRDD(this, command, env, 
+      if (printPipeContext ne null) sc.clean(printPipeContext) else null, 
+      if (printRDDElement ne null) sc.clean(printRDDElement) else null)
 
   /**
    * Return a new RDD by applying a function to each partition of this RDD.
diff --git a/core/src/main/scala/spark/rdd/PipedRDD.scala b/core/src/main/scala/spark/rdd/PipedRDD.scala
index 962a1b21ad1d3ff8c32e461984ec444330e4bf67..b2c07891ab4153de5ff2b350c51ea18a1dd367c3 100644
--- a/core/src/main/scala/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/spark/rdd/PipedRDD.scala
@@ -9,6 +9,7 @@ import scala.collection.mutable.ArrayBuffer
 import scala.io.Source
 
 import spark.{RDD, SparkEnv, Partition, TaskContext}
+import spark.broadcast.Broadcast
 
 
 /**
@@ -18,14 +19,21 @@ import spark.{RDD, SparkEnv, Partition, TaskContext}
 class PipedRDD[T: ClassManifest](
     prev: RDD[T],
     command: Seq[String],
-    envVars: Map[String, String])
+    envVars: Map[String, String],
+    printPipeContext: (String => Unit) => Unit,
+    printRDDElement: (T, String => Unit) => Unit)
   extends RDD[String](prev) {
 
-  def this(prev: RDD[T], command: Seq[String]) = this(prev, command, Map())
-
   // Similar to Runtime.exec(), if we are given a single string, split it into words
   // using a standard StringTokenizer (i.e. by spaces)
-  def this(prev: RDD[T], command: String) = this(prev, PipedRDD.tokenize(command))
+  def this(
+      prev: RDD[T],
+      command: String,
+      envVars: Map[String, String] = Map(),
+      printPipeContext: (String => Unit) => Unit = null,
+      printRDDElement: (T, String => Unit) => Unit = null) =
+    this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement)
+
 
   override def getPartitions: Array[Partition] = firstParent[T].partitions
 
@@ -52,8 +60,17 @@ class PipedRDD[T: ClassManifest](
       override def run() {
         SparkEnv.set(env)
         val out = new PrintWriter(proc.getOutputStream)
+
+        // input the pipe context firstly
+        if ( printPipeContext != null) {
+          printPipeContext(out.println(_))
+        }
         for (elem <- firstParent[T].iterator(split, context)) {
-          out.println(elem)
+          if (printRDDElement != null) {
+            printRDDElement(elem, out.println(_))
+          } else {
+            out.println(elem)
+          }
         }
         out.close()
       }
diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala
index a6344edf8f1614f835cd5fa333baa73827d9e6ba..ed075f93ec5507adb3bed3b745da055b09575fa9 100644
--- a/core/src/test/scala/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/spark/PipedRDDSuite.scala
@@ -19,6 +19,45 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext {
     assert(c(3) === "4")
   }
 
+  test("advanced pipe") {
+    sc = new SparkContext("local", "test")
+    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+    val bl = sc.broadcast(List("0"))
+
+    val piped = nums.pipe(Seq("cat"), 
+      Map[String, String](), 
+      (f: String => Unit) => {bl.value.map(f(_));f("\u0001")},
+      (i:Int, f: String=> Unit) => f(i + "_"))
+
+    val c = piped.collect()
+
+    assert(c.size === 8)
+    assert(c(0) === "0")
+    assert(c(1) === "\u0001")
+    assert(c(2) === "1_")
+    assert(c(3) === "2_")
+    assert(c(4) === "0")
+    assert(c(5) === "\u0001")
+    assert(c(6) === "3_")
+    assert(c(7) === "4_")
+
+    val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
+    val d = nums1.groupBy(str=>str.split("\t")(0)).
+      pipe(Seq("cat"), 
+           Map[String, String](), 
+           (f: String => Unit) => {bl.value.map(f(_));f("\u0001")},
+           (i:Tuple2[String, Seq[String]], f: String=> Unit) => {for (e <- i._2){ f(e + "_")}}).collect()
+    assert(d.size === 8)
+    assert(d(0) === "0")
+    assert(d(1) === "\u0001")
+    assert(d(2) === "b\t2_")
+    assert(d(3) === "b\t4_")
+    assert(d(4) === "0")
+    assert(d(5) === "\u0001")
+    assert(d(6) === "a\t1_")
+    assert(d(7) === "a\t3_")
+  }
+
   test("pipe with env variable") {
     sc = new SparkContext("local", "test")
     val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)