Skip to content
Snippets Groups Projects
Commit 279bd4aa authored by Tejas Patil's avatar Tejas Patil Committed by Shixiong Zhu
Browse files

[SPARK-15826][CORE] PipedRDD to allow configurable char encoding

## What changes were proposed in this pull request?

Link to jira which describes the problem: https://issues.apache.org/jira/browse/SPARK-15826

The fix in this PR is to allow users specify encoding in the pipe() operation. For backward compatibility,
keeping the default value to be system default.

## How was this patch tested?

Ran existing unit tests

Author: Tejas Patil <tejasp@fb.com>

Closes #13563 from tejasapatil/pipedrdd_utf8.
parent 9b234b55
No related branches found
No related tags found
No related merge requests found
......@@ -284,6 +284,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize)
}
/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: JList[String],
env: JMap[String, String],
separateWorkingDir: Boolean,
bufferSize: Int,
encoding: String): JavaRDD[String] = {
rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize, encoding)
}
/**
* Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
* second element in each RDD, etc. Assumes that the two RDDs have the *same number of
......
......@@ -47,22 +47,10 @@ private[spark] class PipedRDD[T: ClassTag](
printPipeContext: (String => Unit) => Unit,
printRDDElement: (T, String => Unit) => Unit,
separateWorkingDir: Boolean,
bufferSize: Int)
bufferSize: Int,
encoding: String)
extends RDD[String](prev) {
// Similar to Runtime.exec(), if we are given a single string, split it into words
// using a standard StringTokenizer (i.e. by spaces)
def this(
prev: RDD[T],
command: String,
envVars: Map[String, String] = Map(),
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null,
separateWorkingDir: Boolean = false) =
this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement,
separateWorkingDir, 8192)
override def getPartitions: Array[Partition] = firstParent[T].partitions
/**
......@@ -129,7 +117,7 @@ private[spark] class PipedRDD[T: ClassTag](
override def run(): Unit = {
val err = proc.getErrorStream
try {
for (line <- Source.fromInputStream(err).getLines) {
for (line <- Source.fromInputStream(err)(encoding).getLines) {
// scalastyle:off println
System.err.println(line)
// scalastyle:on println
......@@ -147,7 +135,7 @@ private[spark] class PipedRDD[T: ClassTag](
override def run(): Unit = {
TaskContext.setTaskContext(context)
val out = new PrintWriter(new BufferedWriter(
new OutputStreamWriter(proc.getOutputStream), bufferSize))
new OutputStreamWriter(proc.getOutputStream, encoding), bufferSize))
try {
// scalastyle:off println
// input the pipe context firstly
......@@ -171,7 +159,7 @@ private[spark] class PipedRDD[T: ClassTag](
}.start()
// Return an iterator that read lines from the process's stdout
val lines = Source.fromInputStream(proc.getInputStream).getLines()
val lines = Source.fromInputStream(proc.getInputStream)(encoding).getLines
new Iterator[String] {
def next(): String = {
if (!hasNext()) {
......
......@@ -21,6 +21,7 @@ import java.util.Random
import scala.collection.{mutable, Map}
import scala.collection.mutable.ArrayBuffer
import scala.io.Codec
import scala.language.implicitConversions
import scala.reflect.{classTag, ClassTag}
......@@ -698,14 +699,14 @@ abstract class RDD[T: ClassTag](
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: String): RDD[String] = withScope {
new PipedRDD(this, command)
pipe(command)
}
/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: String, env: Map[String, String]): RDD[String] = withScope {
new PipedRDD(this, command, env)
pipe(command, env)
}
/**
......@@ -726,6 +727,8 @@ abstract class RDD[T: ClassTag](
* for (e &lt;- record._2) {f(e)}
* @param separateWorkingDir Use separate working directories for each task.
* @param bufferSize Buffer size for the stdin writer for the piped process.
* @param encoding Char encoding used for interacting (via stdin, stdout and stderr) with
* the piped process
* @return the result RDD
*/
def pipe(
......@@ -734,12 +737,14 @@ abstract class RDD[T: ClassTag](
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null,
separateWorkingDir: Boolean = false,
bufferSize: Int = 8192): RDD[String] = withScope {
bufferSize: Int = 8192,
encoding: String = Codec.defaultCharsetCodec.name): RDD[String] = withScope {
new PipedRDD(this, command, env,
if (printPipeContext ne null) sc.clean(printPipeContext) else null,
if (printRDDElement ne null) sc.clean(printRDDElement) else null,
separateWorkingDir,
bufferSize)
bufferSize,
encoding)
}
/**
......
......@@ -20,6 +20,7 @@ package org.apache.spark.rdd
import java.io.File
import scala.collection.Map
import scala.io.Codec
import scala.language.postfixOps
import scala.sys.process._
import scala.util.Try
......@@ -207,7 +208,16 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
}
}
val hadoopPart1 = generateFakeHadoopPartition()
val pipedRdd = new PipedRDD(nums, "printenv " + varName)
val pipedRdd =
new PipedRDD(
nums,
PipedRDD.tokenize("printenv " + varName),
Map(),
null,
null,
false,
4092,
Codec.defaultCharsetCodec.name)
val tContext = TaskContext.empty()
val rddIter = pipedRdd.compute(hadoopPart1, tContext)
val arr = rddIter.toArray
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment