diff --git a/core/src/main/scala/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/spark/api/python/PythonPartitioner.scala index 606a80d1eb12d2d71fc3dbf36dbea2b3487cb279..2c829508e589b3417e02d47d2f28bbf996e08c68 100644 --- a/core/src/main/scala/spark/api/python/PythonPartitioner.scala +++ b/core/src/main/scala/spark/api/python/PythonPartitioner.scala @@ -7,7 +7,7 @@ import java.util.Arrays /** * A [[spark.Partitioner]] that performs handling of byte arrays, for use by the Python API. */ -class PythonPartitioner(override val numPartitions: Int) extends Partitioner { +private[spark] class PythonPartitioner(override val numPartitions: Int) extends Partitioner { override def getPartition(key: Any): Int = { if (key == null) { diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala index 4f870e837a5227ddef37f418d14d77ee70ca85ee..a80a8eea452671504b971ef51a28a9100c9c1911 100644 --- a/core/src/main/scala/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/spark/api/python/PythonRDD.scala @@ -3,7 +3,6 @@ package spark.api.python import java.io._ import java.util.{List => JList} -import scala.collection.Map import scala.collection.JavaConversions._ import scala.io.Source @@ -16,10 +15,26 @@ import spark.OneToOneDependency import spark.rdd.PipedRDD -trait PythonRDDBase { - def compute[T](split: Split, envVars: Map[String, String], - command: Seq[String], parent: RDD[T], pythonExec: String, - broadcastVars: java.util.List[Broadcast[Array[Byte]]]): Iterator[Array[Byte]] = { +private[spark] class PythonRDD[T: ClassManifest]( + parent: RDD[T], command: Seq[String], envVars: java.util.Map[String, String], + preservePartitoning: Boolean, pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]]) + extends RDD[Array[Byte]](parent.context) { + + // Similar to Runtime.exec(), if we are given a single string, split it into words + // using a standard StringTokenizer (i.e. by spaces) + def this(parent: RDD[T], command: String, envVars: java.util.Map[String, String], + preservePartitoning: Boolean, pythonExec: String, + broadcastVars: java.util.List[Broadcast[Array[Byte]]]) = + this(parent, PipedRDD.tokenize(command), envVars, preservePartitoning, pythonExec, + broadcastVars) + + override def splits = parent.splits + + override val dependencies = List(new OneToOneDependency(parent)) + + override val partitioner = if (preservePartitoning) parent.partitioner else None + + override def compute(split: Split): Iterator[Array[Byte]] = { val SPARK_HOME = new ProcessBuilder().environment().get("SPARK_HOME") val pb = new ProcessBuilder(Seq(pythonExec, SPARK_HOME + "/pyspark/pyspark/worker.py")) @@ -100,29 +115,6 @@ trait PythonRDDBase { def hasNext = _nextObj.length != 0 } } -} - -class PythonRDD[T: ClassManifest]( - parent: RDD[T], command: Seq[String], envVars: java.util.Map[String, String], - preservePartitoning: Boolean, pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]]) - extends RDD[Array[Byte]](parent.context) with PythonRDDBase { - - // Similar to Runtime.exec(), if we are given a single string, split it into words - // using a standard StringTokenizer (i.e. by spaces) - def this(parent: RDD[T], command: String, envVars: java.util.Map[String, String], - preservePartitoning: Boolean, pythonExec: String, - broadcastVars: java.util.List[Broadcast[Array[Byte]]]) = - this(parent, PipedRDD.tokenize(command), envVars, preservePartitoning, pythonExec, - broadcastVars) - - override def splits = parent.splits - - override val dependencies = List(new OneToOneDependency(parent)) - - override val partitioner = if (preservePartitoning) parent.partitioner else None - - override def compute(split: Split): Iterator[Array[Byte]] = - compute(split, envVars.toMap, command, parent, pythonExec, broadcastVars) val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this) } @@ -139,7 +131,7 @@ private class PairwiseRDD(prev: RDD[Array[Byte]]) extends val asJavaPairRDD : JavaPairRDD[Array[Byte], Array[Byte]] = JavaPairRDD.fromRDD(this) } -object PythonRDD { +private[spark] object PythonRDD { /** Strips the pickle PROTO and STOP opcodes from the start and end of a pickle */ def stripPickle(arr: Array[Byte]) : Array[Byte] = { diff --git a/pyspark/pyspark/java_gateway.py b/pyspark/pyspark/java_gateway.py index d4a4434c0556c283dacf79f572d060d9b9c3921a..eb2a875762f3f6a810e81a15cec94fe2f964ec3b 100644 --- a/pyspark/pyspark/java_gateway.py +++ b/pyspark/pyspark/java_gateway.py @@ -1,4 +1,5 @@ import os +import sys from subprocess import Popen, PIPE from threading import Thread from py4j.java_gateway import java_import, JavaGateway, GatewayClient @@ -26,7 +27,7 @@ def launch_gateway(): def run(self): while True: line = self.stream.readline() - print line, + sys.stderr.write(line) EchoOutputThread(proc.stdout).start() # Connect to the gateway gateway = JavaGateway(GatewayClient(port=port))