Skip to content
Snippets Groups Projects
Commit fbadb1cd authored by Josh Rosen's avatar Josh Rosen
Browse files

Mark api.python classes as private; echo Java output to stderr.

parent 665466df
No related branches found
No related tags found
No related merge requests found
...@@ -7,7 +7,7 @@ import java.util.Arrays ...@@ -7,7 +7,7 @@ import java.util.Arrays
/** /**
* A [[spark.Partitioner]] that performs handling of byte arrays, for use by the Python API. * A [[spark.Partitioner]] that performs handling of byte arrays, for use by the Python API.
*/ */
class PythonPartitioner(override val numPartitions: Int) extends Partitioner { private[spark] class PythonPartitioner(override val numPartitions: Int) extends Partitioner {
override def getPartition(key: Any): Int = { override def getPartition(key: Any): Int = {
if (key == null) { if (key == null) {
......
...@@ -3,7 +3,6 @@ package spark.api.python ...@@ -3,7 +3,6 @@ package spark.api.python
import java.io._ import java.io._
import java.util.{List => JList} import java.util.{List => JList}
import scala.collection.Map
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.io.Source import scala.io.Source
...@@ -16,10 +15,26 @@ import spark.OneToOneDependency ...@@ -16,10 +15,26 @@ import spark.OneToOneDependency
import spark.rdd.PipedRDD import spark.rdd.PipedRDD
trait PythonRDDBase { private[spark] class PythonRDD[T: ClassManifest](
def compute[T](split: Split, envVars: Map[String, String], parent: RDD[T], command: Seq[String], envVars: java.util.Map[String, String],
command: Seq[String], parent: RDD[T], pythonExec: String, preservePartitoning: Boolean, pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]])
broadcastVars: java.util.List[Broadcast[Array[Byte]]]): Iterator[Array[Byte]] = { extends RDD[Array[Byte]](parent.context) {
// Similar to Runtime.exec(), if we are given a single string, split it into words
// using a standard StringTokenizer (i.e. by spaces)
def this(parent: RDD[T], command: String, envVars: java.util.Map[String, String],
preservePartitoning: Boolean, pythonExec: String,
broadcastVars: java.util.List[Broadcast[Array[Byte]]]) =
this(parent, PipedRDD.tokenize(command), envVars, preservePartitoning, pythonExec,
broadcastVars)
override def splits = parent.splits
override val dependencies = List(new OneToOneDependency(parent))
override val partitioner = if (preservePartitoning) parent.partitioner else None
override def compute(split: Split): Iterator[Array[Byte]] = {
val SPARK_HOME = new ProcessBuilder().environment().get("SPARK_HOME") val SPARK_HOME = new ProcessBuilder().environment().get("SPARK_HOME")
val pb = new ProcessBuilder(Seq(pythonExec, SPARK_HOME + "/pyspark/pyspark/worker.py")) val pb = new ProcessBuilder(Seq(pythonExec, SPARK_HOME + "/pyspark/pyspark/worker.py"))
...@@ -100,29 +115,6 @@ trait PythonRDDBase { ...@@ -100,29 +115,6 @@ trait PythonRDDBase {
def hasNext = _nextObj.length != 0 def hasNext = _nextObj.length != 0
} }
} }
}
class PythonRDD[T: ClassManifest](
parent: RDD[T], command: Seq[String], envVars: java.util.Map[String, String],
preservePartitoning: Boolean, pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]])
extends RDD[Array[Byte]](parent.context) with PythonRDDBase {
// Similar to Runtime.exec(), if we are given a single string, split it into words
// using a standard StringTokenizer (i.e. by spaces)
def this(parent: RDD[T], command: String, envVars: java.util.Map[String, String],
preservePartitoning: Boolean, pythonExec: String,
broadcastVars: java.util.List[Broadcast[Array[Byte]]]) =
this(parent, PipedRDD.tokenize(command), envVars, preservePartitoning, pythonExec,
broadcastVars)
override def splits = parent.splits
override val dependencies = List(new OneToOneDependency(parent))
override val partitioner = if (preservePartitoning) parent.partitioner else None
override def compute(split: Split): Iterator[Array[Byte]] =
compute(split, envVars.toMap, command, parent, pythonExec, broadcastVars)
val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this) val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
} }
...@@ -139,7 +131,7 @@ private class PairwiseRDD(prev: RDD[Array[Byte]]) extends ...@@ -139,7 +131,7 @@ private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
val asJavaPairRDD : JavaPairRDD[Array[Byte], Array[Byte]] = JavaPairRDD.fromRDD(this) val asJavaPairRDD : JavaPairRDD[Array[Byte], Array[Byte]] = JavaPairRDD.fromRDD(this)
} }
object PythonRDD { private[spark] object PythonRDD {
/** Strips the pickle PROTO and STOP opcodes from the start and end of a pickle */ /** Strips the pickle PROTO and STOP opcodes from the start and end of a pickle */
def stripPickle(arr: Array[Byte]) : Array[Byte] = { def stripPickle(arr: Array[Byte]) : Array[Byte] = {
......
import os import os
import sys
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
from threading import Thread from threading import Thread
from py4j.java_gateway import java_import, JavaGateway, GatewayClient from py4j.java_gateway import java_import, JavaGateway, GatewayClient
...@@ -26,7 +27,7 @@ def launch_gateway(): ...@@ -26,7 +27,7 @@ def launch_gateway():
def run(self): def run(self):
while True: while True:
line = self.stream.readline() line = self.stream.readline()
print line, sys.stderr.write(line)
EchoOutputThread(proc.stdout).start() EchoOutputThread(proc.stdout).start()
# Connect to the gateway # Connect to the gateway
gateway = JavaGateway(GatewayClient(port=port)) gateway = JavaGateway(GatewayClient(port=port))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment