Skip to content
Snippets Groups Projects
Commit d9a53b94 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #885 from mateiz/win-py

Allow PySpark to run on Windows
parents 3c520fea 9ee1e9db
No related branches found
No related tags found
No related merge requests found
......@@ -17,8 +17,8 @@
package org.apache.spark.api.python
import java.io.{File, DataInputStream, IOException}
import java.net.{Socket, SocketException, InetAddress}
import java.io.{OutputStreamWriter, File, DataInputStream, IOException}
import java.net.{ServerSocket, Socket, SocketException, InetAddress}
import scala.collection.JavaConversions._
......@@ -26,11 +26,30 @@ import org.apache.spark._
private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String])
extends Logging {
// Because forking processes from Java is expensive, we prefer to launch a single Python daemon
// (pyspark/daemon.py) and tell it to fork new workers for our tasks. This daemon currently
// only works on UNIX-based systems now because it uses signals for child management, so we can
// also fall back to launching workers (pyspark/worker.py) directly.
val useDaemon = !System.getProperty("os.name").startsWith("Windows")
var daemon: Process = null
val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1))
var daemonPort: Int = 0
def create(): Socket = {
if (useDaemon) {
createThroughDaemon()
} else {
createSimpleWorker()
}
}
/**
* Connect to a worker launched through pyspark/daemon.py, which forks python processes itself
* to avoid the high cost of forking from Java. This currently only works on UNIX-based systems.
*/
private def createThroughDaemon(): Socket = {
synchronized {
// Start the daemon if it hasn't been started
startDaemon()
......@@ -50,6 +69,78 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
}
}
/**
* Launch a worker by executing worker.py directly and telling it to connect to us.
*/
private def createSimpleWorker(): Socket = {
var serverSocket: ServerSocket = null
try {
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
// Create and start the worker
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/worker.py"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
workerEnv.put("PYTHONPATH", pythonPath)
val worker = pb.start()
// Redirect the worker's stderr to ours
new Thread("stderr reader for " + pythonExec) {
setDaemon(true)
override def run() {
scala.util.control.Exception.ignoring(classOf[IOException]) {
// FIXME: We copy the stream on the level of bytes to avoid encoding problems.
val in = worker.getErrorStream
val buf = new Array[Byte](1024)
var len = in.read(buf)
while (len != -1) {
System.err.write(buf, 0, len)
len = in.read(buf)
}
}
}
}.start()
// Redirect worker's stdout to our stderr
new Thread("stdout reader for " + pythonExec) {
setDaemon(true)
override def run() {
scala.util.control.Exception.ignoring(classOf[IOException]) {
// FIXME: We copy the stream on the level of bytes to avoid encoding problems.
val in = worker.getInputStream
val buf = new Array[Byte](1024)
var len = in.read(buf)
while (len != -1) {
System.err.write(buf, 0, len)
len = in.read(buf)
}
}
}
}.start()
// Tell the worker our port
val out = new OutputStreamWriter(worker.getOutputStream)
out.write(serverSocket.getLocalPort + "\n")
out.flush()
// Wait for it to connect to our socket
serverSocket.setSoTimeout(10000)
try {
return serverSocket.accept()
} catch {
case e: Exception =>
throw new SparkException("Python worker did not connect back in time", e)
}
} finally {
if (serverSocket != null) {
serverSocket.close()
}
}
null
}
def stop() {
stopDaemon()
}
......@@ -73,12 +164,12 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
// Redirect the stderr to ours
new Thread("stderr reader for " + pythonExec) {
setDaemon(true)
override def run() {
scala.util.control.Exception.ignoring(classOf[IOException]) {
// FIXME HACK: We copy the stream on the level of bytes to
// attempt to dodge encoding problems.
// FIXME: We copy the stream on the level of bytes to avoid encoding problems.
val in = daemon.getErrorStream
var buf = new Array[Byte](1024)
val buf = new Array[Byte](1024)
var len = in.read(buf)
while (len != -1) {
System.err.write(buf, 0, len)
......@@ -93,11 +184,11 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
// Redirect further stdout output to our stderr
new Thread("stdout reader for " + pythonExec) {
setDaemon(true)
override def run() {
scala.util.control.Exception.ignoring(classOf[IOException]) {
// FIXME HACK: We copy the stream on the level of bytes to
// attempt to dodge encoding problems.
var buf = new Array[Byte](1024)
// FIXME: We copy the stream on the level of bytes to avoid encoding problems.
val buf = new Array[Byte](1024)
var len = in.read(buf)
while (len != -1) {
System.err.write(buf, 0, len)
......
......@@ -315,7 +315,7 @@ Apart from these, the following properties are also available, and may be useful
# Environment Variables
Certain Spark settings can also be configured through environment variables, which are read from the `conf/spark-env.sh`
script in the directory where Spark is installed. These variables are meant to be for machine-specific settings, such
script in the directory where Spark is installed (or `conf/spark-env.cmd` on Windows). These variables are meant to be for machine-specific settings, such
as library search paths. While Java system properties can also be set here, for application settings, we recommend setting
these properties within the application instead of in `spark-env.sh` so that different applications can use different
settings.
......@@ -325,6 +325,8 @@ Note that `conf/spark-env.sh` does not exist by default when Spark is installed.
The following variables can be set in `spark-env.sh`:
* `JAVA_HOME`, the location where Java is installed (if it's not on your default `PATH`)
* `PYSPARK_PYTHON`, the Python binary to use for PySpark
* `SPARK_LOCAL_IP`, to configure which IP address of the machine to bind to.
* `SPARK_LIBRARY_PATH`, to add search directories for native libraries.
* `SPARK_CLASSPATH`, to add elements to Spark's classpath that you want to be present for _all_ applications.
......
......@@ -11,6 +11,8 @@ Spark can run on the Apache Mesos cluster manager, Hadoop YARN, Amazon EC2, or w
Get Spark by visiting the [downloads page](http://spark.incubator.apache.org/downloads.html) of the Apache Spark site. This documentation is for Spark version {{site.SPARK_VERSION}}.
Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). All you need to run it is to have `java` to installed on your system `PATH`, or the `JAVA_HOME` environment variable pointing to a Java installation.
# Building
Spark uses [Simple Build Tool](http://www.scala-sbt.org), which is bundled with it. To compile the code, go into the top-level Spark directory and run
......@@ -50,6 +52,8 @@ In addition, if you wish to run Spark on [YARN](running-on-yarn.md), set
SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly
(Note that on Windows, you need to set the environment variables on separate lines, e.g., `set SPARK_HADOOP_VERSION=1.2.1`.)
# Where to Go from Here
**Programming guides:**
......
......@@ -4,7 +4,7 @@ title: Python Programming Guide
---
The Spark Python API (PySpark) exposes most of the Spark features available in the Scala version to Python.
The Spark Python API (PySpark) exposes the Spark programming model to Python.
To learn the basics of Spark, we recommend reading through the
[Scala programming guide](scala-programming-guide.html) first; it should be
easy to follow even if you don't know Scala.
......@@ -15,12 +15,8 @@ This guide will show how to use the Spark features described there in Python.
There are a few key differences between the Python and Scala APIs:
* Python is dynamically typed, so RDDs can hold objects of different types.
* PySpark does not currently support the following Spark features:
- `lookup`
- `sort`
- `persist` at storage levels other than `MEMORY_ONLY`
- Execution on Windows -- this is slated for a future release
* Python is dynamically typed, so RDDs can hold objects of multiple types.
* PySpark does not yet support a few API calls, such as `lookup`, `sort`, and `persist` at custom storage levels. See the [API docs](api/pyspark/index.html) for details.
In PySpark, RDDs support the same methods as their Scala counterparts but take Python functions and return Python collection types.
Short functions can be passed to RDD methods using Python's [`lambda`](http://www.diveintopython.net/power_of_introspection/lambda_functions.html) syntax:
......@@ -30,7 +26,7 @@ logData = sc.textFile(logFile).cache()
errors = logData.filter(lambda line: "ERROR" in line)
{% endhighlight %}
You can also pass functions that are defined using the `def` keyword; this is useful for more complicated functions that cannot be expressed using `lambda`:
You can also pass functions that are defined with the `def` keyword; this is useful for longer functions that can't be expressed using `lambda`:
{% highlight python %}
def is_error(line):
......@@ -38,7 +34,7 @@ def is_error(line):
errors = logData.filter(is_error)
{% endhighlight %}
Functions can access objects in enclosing scopes, although modifications to those objects within RDD methods will not be propagated to other tasks:
Functions can access objects in enclosing scopes, although modifications to those objects within RDD methods will not be propagated back:
{% highlight python %}
error_keywords = ["Exception", "Error"]
......@@ -51,17 +47,20 @@ PySpark will automatically ship these functions to workers, along with any objec
Instances of classes will be serialized and shipped to workers by PySpark, but classes themselves cannot be automatically distributed to workers.
The [Standalone Use](#standalone-use) section describes how to ship code dependencies to workers.
In addition, PySpark fully supports interactive use---simply run `./pyspark` to launch an interactive shell.
# Installing and Configuring PySpark
PySpark requires Python 2.6 or higher.
PySpark jobs are executed using a standard cPython interpreter in order to support Python modules that use C extensions.
PySpark jobs are executed using a standard CPython interpreter in order to support Python modules that use C extensions.
We have not tested PySpark with Python 3 or with alternative Python interpreters, such as [PyPy](http://pypy.org/) or [Jython](http://www.jython.org/).
By default, PySpark's scripts will run programs using `python`; an alternate Python executable may be specified by setting the `PYSPARK_PYTHON` environment variable in `conf/spark-env.sh`.
By default, PySpark requires `python` to be available on the system `PATH` and use it to run programs; an alternate Python executable may be specified by setting the `PYSPARK_PYTHON` environment variable in `conf/spark-env.sh` (or `.cmd` on Windows).
All of PySpark's library dependencies, including [Py4J](http://py4j.sourceforge.net/), are bundled with PySpark and automatically imported.
Standalone PySpark jobs should be run using the `pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh`.
Standalone PySpark jobs should be run using the `pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh` or `.cmd`.
The script automatically adds the `pyspark` package to the `PYTHONPATH`.
......@@ -101,7 +100,7 @@ $ MASTER=local[4] ./pyspark
## IPython
It is also possible to launch PySpark in [IPython](http://ipython.org), the enhanced Python interpreter.
To do this, simply set the `IPYTHON` variable to `1` when running `pyspark`:
To do this, set the `IPYTHON` variable to `1` when running `pyspark`:
{% highlight bash %}
$ IPYTHON=1 ./pyspark
......@@ -132,15 +131,16 @@ sc = SparkContext("local", "Job Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg
Files listed here will be added to the `PYTHONPATH` and shipped to remote worker machines.
Code dependencies can be added to an existing SparkContext using its `addPyFile()` method.
# API Docs
[API documentation](api/pyspark/index.html) for PySpark is available as Epydoc.
Many of the methods also contain [doctests](http://docs.python.org/2/library/doctest.html) that provide additional usage examples.
# Where to Go from Here
PySpark includes several sample programs in the [`python/examples` folder](https://github.com/apache/incubator-spark/tree/master/python/examples).
You can run them by passing the files to the `pyspark` script; e.g.:
PySpark also includes several sample programs in the [`python/examples` folder](https://github.com/apache/incubator-spark/tree/master/python/examples).
You can run them by passing the files to `pyspark`; e.g.:
./pyspark python/examples/wordcount.py
Each program prints usage help when run without arguments.
We currently provide [API documentation](api/pyspark/index.html) for the Python API as Epydoc.
Many of the RDD method descriptions contain [doctests](http://docs.python.org/2/library/doctest.html) that provide additional usage examples.
@echo off
rem
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
rem This is the entry point for running PySpark. To avoid polluting the
rem environment, it just launches a new cmd to do the real work.
cmd /V /E /C %~dp0pyspark2.cmd %*
@echo off
rem
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
set SCALA_VERSION=2.9.3
rem Figure out where the Spark framework is installed
set FWDIR=%~dp0
rem Export this as SPARK_HOME
set SPARK_HOME=%FWDIR%
rem Test whether the user has built Spark
if exist "%FWDIR%RELEASE" goto skip_build_test
set FOUND_JAR=0
for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*.jar") do (
set FOUND_JAR=1
)
if "%FOUND_JAR%"=="0" (
echo Failed to find Spark assembly JAR.
echo You need to build Spark with sbt\sbt assembly before running this program.
goto exit
)
:skip_build_test
rem Load environment variables from conf\spark-env.cmd, if it exists
if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
rem Figure out which Python to use.
if "x%PYSPARK_PYTHON%"=="x" set PYSPARK_PYTHON=python
set PYTHONPATH=%FWDIR%python;%PYTHONPATH%
set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py
echo Running %PYSPARK_PYTHON% with PYTHONPATH=%PYTHONPATH%
"%PYSPARK_PYTHON%" %*
:exit
......@@ -18,6 +18,7 @@
import os
import sys
import signal
import platform
from subprocess import Popen, PIPE
from threading import Thread
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
......@@ -29,12 +30,18 @@ SPARK_HOME = os.environ["SPARK_HOME"]
def launch_gateway():
# Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and SPARK_MEM settings from spark-env.sh
command = [os.path.join(SPARK_HOME, "spark-class"), "py4j.GatewayServer",
on_windows = platform.system() == "Windows"
script = "spark-class.cmd" if on_windows else "spark-class"
command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer",
"--die-on-broken-pipe", "0"]
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_function():
signal.signal(signal.SIGINT, signal.SIG_IGN)
proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_function)
if not on_windows:
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_func():
signal.signal(signal.SIGINT, signal.SIG_IGN)
proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func)
else:
# preexec_fn not supported on Windows
proc = Popen(command, stdout=PIPE, stdin=PIPE)
# Determine which ephemeral port the server started on:
port = int(proc.stdout.readline())
# Create a thread to echo output from the GatewayServer, which is required
......
......@@ -21,6 +21,7 @@ Worker that receives input from Piped RDD.
import os
import sys
import time
import socket
import traceback
from base64 import standard_b64decode
# CloudPickler needs to be imported so that depicklers are registered using the
......@@ -94,7 +95,9 @@ def main(infile, outfile):
if __name__ == '__main__':
# Redirect stdout to stderr so that users must return values from functions.
old_stdout = os.fdopen(os.dup(1), 'w')
os.dup2(2, 1)
main(sys.stdin, old_stdout)
# Read a local port to connect to from stdin
java_port = int(sys.stdin.readline())
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("127.0.0.1", java_port))
sock_file = sock.makefile("a+", 65536)
main(sock_file, sock_file)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment