Skip to content
Snippets Groups Projects
Commit fb6d733f authored by Gavin Li's avatar Gavin Li
Browse files

update according to comments

parent e179ff8a
No related branches found
No related tags found
No related merge requests found
......@@ -355,68 +355,6 @@ abstract class RDD[T: ClassManifest](
def pipe(command: String, env: Map[String, String]): RDD[String] =
new PipedRDD(this, command, env)
/**
* Return an RDD created by piping elements to a forked external process.
* How each record in RDD is outputed to the process can be controled by providing a
* function trasnform(T, outputFunction: String => Unit). transform() will be called with
* the currnet record in RDD as the 1st parameter, and the function to output the record to
* the external process (like out.println()) as the 2nd parameter.
* Here's an example on how to pipe the RDD data of groupBy() in a streaming way,
* instead of constructing a huge String to concat all the records:
* def tranform(record:(String, Seq[String]), f:String=>Unit) = for (e <- record._2){f(e)}
* pipeContext can be used to transfer additional context data to the external process
* besides the RDD. pipeContext is a broadcast Seq[String], each line would be piped to
* external process with "^A" as the delimiter in the end of context data. Delimiter can also
* be customized by the last parameter delimiter.
*/
def pipe[U<: Seq[String]](
command: String,
env: Map[String, String],
transform: (T,String => Unit) => Any,
pipeContext: Broadcast[U],
delimiter: String): RDD[String] =
new PipedRDD(this, command, env, transform, pipeContext, delimiter)
/**
* Return an RDD created by piping elements to a forked external process.
* How each record in RDD is outputed to the process can be controled by providing a
* function trasnform(T, outputFunction: String => Unit). transform() will be called with
* the currnet record in RDD as the 1st parameter, and the function to output the record to
* the external process (like out.println()) as the 2nd parameter.
* Here's an example on how to pipe the RDD data of groupBy() in a streaming way,
* instead of constructing a huge String to concat all the records:
* def tranform(record:(String, Seq[String]), f:String=>Unit) = for (e <- record._2){f(e)}
* pipeContext can be used to transfer additional context data to the external process
* besides the RDD. pipeContext is a broadcast Seq[String], each line would be piped to
* external process with "^A" as the delimiter in the end of context data. Delimiter can also
* be customized by the last parameter delimiter.
*/
def pipe[U<: Seq[String]](
command: String,
transform: (T,String => Unit) => Any,
pipeContext: Broadcast[U]): RDD[String] =
new PipedRDD(this, command, Map[String, String](), transform, pipeContext, "\u0001")
/**
* Return an RDD created by piping elements to a forked external process.
* How each record in RDD is outputed to the process can be controled by providing a
* function trasnform(T, outputFunction: String => Unit). transform() will be called with
* the currnet record in RDD as the 1st parameter, and the function to output the record to
* the external process (like out.println()) as the 2nd parameter.
* Here's an example on how to pipe the RDD data of groupBy() in a streaming way,
* instead of constructing a huge String to concat all the records:
* def tranform(record:(String, Seq[String]), f:String=>Unit) = for (e <- record._2){f(e)}
* pipeContext can be used to transfer additional context data to the external process
* besides the RDD. pipeContext is a broadcast Seq[String], each line would be piped to
* external process with "^A" as the delimiter in the end of context data. Delimiter can also
* be customized by the last parameter delimiter.
*/
def pipe[U<: Seq[String]](
command: String,
env: Map[String, String],
transform: (T,String => Unit) => Any,
pipeContext: Broadcast[U]): RDD[String] =
new PipedRDD(this, command, env, transform, pipeContext, "\u0001")
/**
* Return an RDD created by piping elements to a forked external process.
......@@ -432,13 +370,12 @@ abstract class RDD[T: ClassManifest](
* external process with "^A" as the delimiter in the end of context data. Delimiter can also
* be customized by the last parameter delimiter.
*/
def pipe[U<: Seq[String]](
def pipe(
command: Seq[String],
env: Map[String, String] = Map(),
transform: (T,String => Unit) => Any = null,
pipeContext: Broadcast[U] = null,
delimiter: String = "\u0001"): RDD[String] =
new PipedRDD(this, command, env, transform, pipeContext, delimiter)
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null): RDD[String] =
new PipedRDD(this, command, env, if (printPipeContext ne null) sc.clean(printPipeContext) else null, printRDDElement)
/**
* Return a new RDD by applying a function to each partition of this RDD.
......
......@@ -16,14 +16,12 @@ import spark.broadcast.Broadcast
* An RDD that pipes the contents of each parent partition through an external command
* (printing them one per line) and returns the output as a collection of strings.
*/
class PipedRDD[T: ClassManifest, U <: Seq[String]](
class PipedRDD[T: ClassManifest](
prev: RDD[T],
command: Seq[String],
envVars: Map[String, String],
transform: (T, String => Unit) => Any,
pipeContext: Broadcast[U],
delimiter: String
)
printPipeContext: (String => Unit) => Unit,
printRDDElement: (T, String => Unit) => Unit)
extends RDD[String](prev) {
// Similar to Runtime.exec(), if we are given a single string, split it into words
......@@ -32,10 +30,9 @@ class PipedRDD[T: ClassManifest, U <: Seq[String]](
prev: RDD[T],
command: String,
envVars: Map[String, String] = Map(),
transform: (T, String => Unit) => Any = null,
pipeContext: Broadcast[U] = null,
delimiter: String = "\u0001") =
this(prev, PipedRDD.tokenize(command), envVars, transform, pipeContext, delimiter)
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null) =
this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement)
override def getPartitions: Array[Partition] = firstParent[T].partitions
......@@ -64,17 +61,13 @@ class PipedRDD[T: ClassManifest, U <: Seq[String]](
SparkEnv.set(env)
val out = new PrintWriter(proc.getOutputStream)
// input the pipeContext firstly
if ( pipeContext != null) {
for (elem <- pipeContext.value) {
out.println(elem)
}
// delimiter\n as the marker of the end of the pipeContext
out.println(delimiter)
// input the pipe context firstly
if ( printPipeContext != null) {
printPipeContext(out.println(_))
}
for (elem <- firstParent[T].iterator(split, context)) {
if (transform != null) {
transform(elem, out.println(_))
if (printRDDElement != null) {
printRDDElement(elem, out.println(_))
} else {
out.println(elem)
}
......
......@@ -22,9 +22,12 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext {
test("advanced pipe") {
sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val bl = sc.broadcast(List("0"))
val piped = nums.pipe(Seq("cat"), Map[String, String](),
(i:Int, f: String=> Unit) => f(i + "_"), sc.broadcast(List("0")))
val piped = nums.pipe(Seq("cat"),
Map[String, String](),
(f: String => Unit) => {bl.value.map(f(_));f("\u0001")},
(i:Int, f: String=> Unit) => f(i + "_"))
val c = piped.collect()
......@@ -40,8 +43,10 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext {
val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
val d = nums1.groupBy(str=>str.split("\t")(0)).
pipe(Seq("cat"), Map[String, String](), (i:Tuple2[String, Seq[String]], f: String=> Unit) =>
{for (e <- i._2){ f(e + "_")}}, sc.broadcast(List("0"))).collect()
pipe(Seq("cat"),
Map[String, String](),
(f: String => Unit) => {bl.value.map(f(_));f("\u0001")},
(i:Tuple2[String, Seq[String]], f: String=> Unit) => {for (e <- i._2){ f(e + "_")}}).collect()
assert(d.size === 8)
assert(d(0) === "0")
assert(d(1) === "\u0001")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment