diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 5a41db23c20a435f48f5aeffb40f3ef465f144ad..a1c96043246312dde9ae7e80623cfe63541d1693 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -16,6 +16,7 @@ import org.apache.hadoop.mapred.TextOutputFormat import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} +import spark.broadcast.Broadcast import spark.Partitioner._ import spark.partial.BoundedDouble import spark.partial.CountEvaluator @@ -351,31 +352,93 @@ abstract class RDD[T: ClassManifest]( /** * Return an RDD created by piping elements to a forked external process. */ - def pipe(command: String, transform: (T,String => Unit) => Any, arguments: Seq[String]): RDD[String] = - new PipedRDD(this, command, transform, arguments) + def pipe(command: String, env: Map[String, String]): RDD[String] = + new PipedRDD(this, command, env) /** * Return an RDD created by piping elements to a forked external process. - */ - def pipe(command: Seq[String]): RDD[String] = new PipedRDD(this, command) + * How each record in RDD is outputed to the process can be controled by providing a + * function trasnform(T, outputFunction: String => Unit). transform() will be called with + * the currnet record in RDD as the 1st parameter, and the function to output the record to + * the external process (like out.println()) as the 2nd parameter. + * Here's an example on how to pipe the RDD data of groupBy() in a streaming way, + * instead of constructing a huge String to concat all the records: + * def tranform(record:(String, Seq[String]), f:String=>Unit) = for (e <- record._2){f(e)} + * pipeContext can be used to transfer additional context data to the external process + * besides the RDD. pipeContext is a broadcast Seq[String], each line would be piped to + * external process with "^A" as the delimiter in the end of context data. Delimiter can also + * be customized by the last parameter delimiter. + */ + def pipe[U<: Seq[String]]( + command: String, + env: Map[String, String], + transform: (T,String => Unit) => Any, + pipeContext: Broadcast[U], + delimiter: String): RDD[String] = + new PipedRDD(this, command, env, transform, pipeContext, delimiter) /** * Return an RDD created by piping elements to a forked external process. - */ - def pipe(command: Seq[String], transform: (T,String => Unit) => Any, arguments: Seq[String]): RDD[String] = - new PipedRDD(this, command, transform, arguments) + * How each record in RDD is outputed to the process can be controled by providing a + * function trasnform(T, outputFunction: String => Unit). transform() will be called with + * the currnet record in RDD as the 1st parameter, and the function to output the record to + * the external process (like out.println()) as the 2nd parameter. + * Here's an example on how to pipe the RDD data of groupBy() in a streaming way, + * instead of constructing a huge String to concat all the records: + * def tranform(record:(String, Seq[String]), f:String=>Unit) = for (e <- record._2){f(e)} + * pipeContext can be used to transfer additional context data to the external process + * besides the RDD. pipeContext is a broadcast Seq[String], each line would be piped to + * external process with "^A" as the delimiter in the end of context data. Delimiter can also + * be customized by the last parameter delimiter. + */ + def pipe[U<: Seq[String]]( + command: String, + transform: (T,String => Unit) => Any, + pipeContext: Broadcast[U]): RDD[String] = + new PipedRDD(this, command, Map[String, String](), transform, pipeContext, "\u0001") /** * Return an RDD created by piping elements to a forked external process. - */ - def pipe(command: Seq[String], env: Map[String, String]): RDD[String] = - new PipedRDD(this, command, env) + * How each record in RDD is outputed to the process can be controled by providing a + * function trasnform(T, outputFunction: String => Unit). transform() will be called with + * the currnet record in RDD as the 1st parameter, and the function to output the record to + * the external process (like out.println()) as the 2nd parameter. + * Here's an example on how to pipe the RDD data of groupBy() in a streaming way, + * instead of constructing a huge String to concat all the records: + * def tranform(record:(String, Seq[String]), f:String=>Unit) = for (e <- record._2){f(e)} + * pipeContext can be used to transfer additional context data to the external process + * besides the RDD. pipeContext is a broadcast Seq[String], each line would be piped to + * external process with "^A" as the delimiter in the end of context data. Delimiter can also + * be customized by the last parameter delimiter. + */ + def pipe[U<: Seq[String]]( + command: String, + env: Map[String, String], + transform: (T,String => Unit) => Any, + pipeContext: Broadcast[U]): RDD[String] = + new PipedRDD(this, command, env, transform, pipeContext, "\u0001") /** * Return an RDD created by piping elements to a forked external process. - */ - def pipe(command: Seq[String], env: Map[String, String], transform: (T,String => Unit) => Any, arguments: Seq[String]): RDD[String] = - new PipedRDD(this, command, env, transform, arguments) + * How each record in RDD is outputed to the process can be controled by providing a + * function trasnform(T, outputFunction: String => Unit). transform() will be called with + * the currnet record in RDD as the 1st parameter, and the function to output the record to + * the external process (like out.println()) as the 2nd parameter. + * Here's an example on how to pipe the RDD data of groupBy() in a streaming way, + * instead of constructing a huge String to concat all the records: + * def tranform(record:(String, Seq[String]), f:String=>Unit) = for (e <- record._2){f(e)} + * pipeContext can be used to transfer additional context data to the external process + * besides the RDD. pipeContext is a broadcast Seq[String], each line would be piped to + * external process with "^A" as the delimiter in the end of context data. Delimiter can also + * be customized by the last parameter delimiter. + */ + def pipe[U<: Seq[String]]( + command: Seq[String], + env: Map[String, String] = Map(), + transform: (T,String => Unit) => Any = null, + pipeContext: Broadcast[U] = null, + delimiter: String = "\u0001"): RDD[String] = + new PipedRDD(this, command, env, transform, pipeContext, delimiter) /** * Return a new RDD by applying a function to each partition of this RDD. diff --git a/core/src/main/scala/spark/rdd/PipedRDD.scala b/core/src/main/scala/spark/rdd/PipedRDD.scala index 969404c95f64f5c0efcaf340d66641e819b3f6bc..d58aaae709ab940c42cc1c0e14eb7ef5896b2c0a 100644 --- a/core/src/main/scala/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/spark/rdd/PipedRDD.scala @@ -9,29 +9,33 @@ import scala.collection.mutable.ArrayBuffer import scala.io.Source import spark.{RDD, SparkEnv, Partition, TaskContext} +import spark.broadcast.Broadcast /** * An RDD that pipes the contents of each parent partition through an external command * (printing them one per line) and returns the output as a collection of strings. */ -class PipedRDD[T: ClassManifest]( +class PipedRDD[T: ClassManifest, U <: Seq[String]]( prev: RDD[T], command: Seq[String], envVars: Map[String, String], transform: (T, String => Unit) => Any, - arguments: Seq[String] + pipeContext: Broadcast[U], + delimiter: String ) extends RDD[String](prev) { - def this(prev: RDD[T], command: Seq[String], envVars : Map[String, String]) = this(prev, command, envVars, null, null) - def this(prev: RDD[T], command: Seq[String]) = this(prev, command, Map()) - def this(prev: RDD[T], command: Seq[String], transform: (T,String => Unit) => Any, arguments: Seq[String]) = this(prev, command, Map(), transform, arguments) - // Similar to Runtime.exec(), if we are given a single string, split it into words // using a standard StringTokenizer (i.e. by spaces) - def this(prev: RDD[T], command: String) = this(prev, PipedRDD.tokenize(command)) - def this(prev: RDD[T], command: String, transform: (T,String => Unit) => Any, arguments: Seq[String]) = this(prev, PipedRDD.tokenize(command), Map(), transform, arguments) + def this( + prev: RDD[T], + command: String, + envVars: Map[String, String] = Map(), + transform: (T, String => Unit) => Any = null, + pipeContext: Broadcast[U] = null, + delimiter: String = "\u0001") = + this(prev, PipedRDD.tokenize(command), envVars, transform, pipeContext, delimiter) override def getPartitions: Array[Partition] = firstParent[T].partitions @@ -60,19 +64,18 @@ class PipedRDD[T: ClassManifest]( SparkEnv.set(env) val out = new PrintWriter(proc.getOutputStream) - // input the arguments firstly - if ( arguments != null) { - for (elem <- arguments) { + // input the pipeContext firstly + if ( pipeContext != null) { + for (elem <- pipeContext.value) { out.println(elem) } - // ^A \n as the marker of the end of the arguments - out.println("\u0001") + // delimiter\n as the marker of the end of the pipeContext + out.println(delimiter) } for (elem <- firstParent[T].iterator(split, context)) { if (transform != null) { transform(elem, out.println(_)) - } - else { + } else { out.println(elem) } } diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala index ee55952a945b5ab27597b74bf9428ddc369f5ba2..d2852867de7536143b7802fc8dc422089a2459bf 100644 --- a/core/src/test/scala/spark/PipedRDDSuite.scala +++ b/core/src/test/scala/spark/PipedRDDSuite.scala @@ -23,7 +23,8 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext { sc = new SparkContext("local", "test") val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) - val piped = nums.pipe(Seq("cat"), (i:Int, f: String=> Unit) => f(i + "_"), Array("0")) + val piped = nums.pipe(Seq("cat"), Map[String, String](), + (i:Int, f: String=> Unit) => f(i + "_"), sc.broadcast(List("0"))) val c = piped.collect() @@ -38,7 +39,9 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext { assert(c(7) === "4_") val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2) - val d = nums1.groupBy(str=>str.split("\t")(0)).pipe(Seq("cat"), (i:Tuple2[String, Seq[String]], f: String=> Unit) => {for (e <- i._2){ f(e + "_")}}, Array("0")).collect() + val d = nums1.groupBy(str=>str.split("\t")(0)). + pipe(Seq("cat"), Map[String, String](), (i:Tuple2[String, Seq[String]], f: String=> Unit) => + {for (e <- i._2){ f(e + "_")}}, sc.broadcast(List("0"))).collect() assert(d.size === 8) assert(d(0) === "0") assert(d(1) === "\u0001")