diff --git a/LICENSE b/LICENSE index dca03ab16de29b1b071d8d9dce7826a06497eb33..790476ece15bd8b249faeec7088414a12f7de50b 100644 --- a/LICENSE +++ b/LICENSE @@ -265,7 +265,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt. (New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf) (The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net) (The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net) - (The New BSD License) Py4J (net.sf.py4j:py4j:0.8.2.1 - http://py4j.sourceforge.net/) + (The New BSD License) Py4J (net.sf.py4j:py4j:0.9 - http://py4j.sourceforge.net/) (Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/) (BSD licence) sbt and sbt-launch-lib.bash (BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE) diff --git a/bin/pyspark b/bin/pyspark index 8f2a3b5a7717b952ce4fd2269ffa775e3d126978..18012ee4a0b4f0aca51ae5ba1a8034d51b1dbd0d 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -65,7 +65,7 @@ export PYSPARK_PYTHON # Add the PySpark classes to the Python path: export PYTHONPATH="$SPARK_HOME/python/:$PYTHONPATH" -export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH" +export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH" # Load the PySpark shell.py script when ./pyspark is used interactively: export OLD_PYTHONSTARTUP="$PYTHONSTARTUP" diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd index 3c6169983e76bfea4ce00981f09bf36ac5ab65f2..a97d884f0bf39f9a90c5dca7b79448f6e2b1f779 100644 --- a/bin/pyspark2.cmd +++ b/bin/pyspark2.cmd @@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" ( ) set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH% -set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.8.2.1-src.zip;%PYTHONPATH% +set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9-src.zip;%PYTHONPATH% set OLD_PYTHONSTARTUP=%PYTHONSTARTUP% set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py diff --git a/core/pom.xml b/core/pom.xml index fdcb6a7902bbdf1b3afb7932339b6d2bb418efb0..319a50049a82d8979fca8ed69b8e20b93e6aef07 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -350,7 +350,7 @@ <dependency> <groupId>net.sf.py4j</groupId> <artifactId>py4j</artifactId> - <version>0.8.2.1</version> + <version>0.9</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index 31e534f160eeb8195ad48669541c45c344d173d6..292ac4cfc35b9940cd189454136dcf1643ce34f1 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -32,7 +32,7 @@ private[spark] object PythonUtils { val pythonPath = new ArrayBuffer[String] for (sparkHome <- sys.env.get("SPARK_HOME")) { pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator) - pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.2.1-src.zip").mkString(File.separator) + pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9-src.zip").mkString(File.separator) } pythonPath ++= SparkContext.jarOfObject(this) pythonPath.mkString(File.pathSeparator) diff --git a/python/docs/Makefile b/python/docs/Makefile index 8a1324eecd325b4e0e39e13faa9aa4a5e9d4d86a..4cec74f057fbe22ac2285bc66b8b9cb84627e80d 100644 --- a/python/docs/Makefile +++ b/python/docs/Makefile @@ -7,7 +7,7 @@ SPHINXBUILD = sphinx-build PAPER = BUILDDIR = _build -export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.8.2.1-src.zip) +export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.9-src.zip) # User-friendly check for sphinx-build ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1) diff --git a/python/lib/py4j-0.8.2.1-src.zip b/python/lib/py4j-0.8.2.1-src.zip deleted file mode 100644 index 5203b84d9119ea5415114939624411ba7d3daa8a..0000000000000000000000000000000000000000 Binary files a/python/lib/py4j-0.8.2.1-src.zip and /dev/null differ diff --git a/python/lib/py4j-0.9-src.zip b/python/lib/py4j-0.9-src.zip new file mode 100644 index 0000000000000000000000000000000000000000..dace2d0fe3b0bd01d24c07e7747aa68980896598 Binary files /dev/null and b/python/lib/py4j-0.9-src.zip differ diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index a8c9ffc235b9e13c1876c27586a2e70043cd9e83..975c75473214adc7c7b96ce946fd6f3cc2e6912c 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -32,48 +32,6 @@ from pyspark.streaming.util import TransformFunction, TransformFunctionSerialize __all__ = ["StreamingContext"] -def _daemonize_callback_server(): - """ - Hack Py4J to daemonize callback server - - The thread of callback server has daemon=False, it will block the driver - from exiting if it's not shutdown. The following code replace `start()` - of CallbackServer with a new version, which set daemon=True for this - thread. - - Also, it will update the port number (0) with real port - """ - # TODO: create a patch for Py4J - import socket - import py4j.java_gateway - logger = py4j.java_gateway.logger - from py4j.java_gateway import Py4JNetworkError - from threading import Thread - - def start(self): - """Starts the CallbackServer. This method should be called by the - client instead of run().""" - self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, - 1) - try: - self.server_socket.bind((self.address, self.port)) - if not self.port: - # update port with real port - self.port = self.server_socket.getsockname()[1] - except Exception as e: - msg = 'An error occurred while trying to start the callback server: %s' % e - logger.exception(msg) - raise Py4JNetworkError(msg) - - # Maybe thread needs to be cleanup up? - self.thread = Thread(target=self.run) - self.thread.daemon = True - self.thread.start() - - py4j.java_gateway.CallbackServer.start = start - - class StreamingContext(object): """ Main entry point for Spark Streaming functionality. A StreamingContext @@ -123,10 +81,14 @@ class StreamingContext(object): # start callback server # getattr will fallback to JVM, so we cannot test by hasattr() - if "_callback_server" not in gw.__dict__: - _daemonize_callback_server() - # use random port - gw._start_callback_server(0) + if "_callback_server" not in gw.__dict__ or gw._callback_server is None: + gw.callback_server_parameters.eager_load = True + gw.callback_server_parameters.daemonize = True + gw.callback_server_parameters.daemonize_connections = True + gw.callback_server_parameters.port = 0 + gw.start_callback_server(gw.callback_server_parameters) + cbport = gw._callback_server.server_socket.getsockname()[1] + gw._callback_server.port = cbport # gateway with real port gw._python_proxy_port = gw._callback_server.port # get the GatewayServer object in JVM by ID diff --git a/python/pyspark/streaming/flume.py b/python/pyspark/streaming/flume.py index c0cdc50d8d4230f1d47923e92358133de1c58ed6..b3d1905365925e1b50f24d01e0e25cddc356df9a 100644 --- a/python/pyspark/streaming/flume.py +++ b/python/pyspark/streaming/flume.py @@ -20,7 +20,7 @@ if sys.version >= "3": from io import BytesIO else: from StringIO import StringIO -from py4j.java_gateway import Py4JJavaError +from py4j.protocol import Py4JJavaError from pyspark.storagelevel import StorageLevel from pyspark.serializers import PairDeserializer, NoOpSerializer, UTF8Deserializer, read_int diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index 8a814c64c04236b2e460ba71f284ba349228959d..b35bbaf404cc5edddc319c242dfcdd66b7c62c14 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -15,7 +15,7 @@ # limitations under the License. # -from py4j.java_gateway import Py4JJavaError +from py4j.protocol import Py4JJavaError from pyspark.rdd import RDD from pyspark.storagelevel import StorageLevel diff --git a/python/pyspark/streaming/kinesis.py b/python/pyspark/streaming/kinesis.py index 34be5880e1708245733f5daae3bfb0e5b754433b..af72c3d6903f9d716d2ddd6d6e5130665518fa2e 100644 --- a/python/pyspark/streaming/kinesis.py +++ b/python/pyspark/streaming/kinesis.py @@ -15,7 +15,7 @@ # limitations under the License. # -from py4j.java_gateway import Py4JJavaError +from py4j.protocol import Py4JJavaError from pyspark.serializers import PairDeserializer, NoOpSerializer from pyspark.storagelevel import StorageLevel diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py index fa83006c36db6ab80b78723081462820e0bbcdb1..1ce4093196e634fbfeecc34f05ee5f520d3fb913 100644 --- a/python/pyspark/streaming/mqtt.py +++ b/python/pyspark/streaming/mqtt.py @@ -15,7 +15,7 @@ # limitations under the License. # -from py4j.java_gateway import Py4JJavaError +from py4j.protocol import Py4JJavaError from pyspark.storagelevel import StorageLevel from pyspark.serializers import UTF8Deserializer diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index e4e56fff3b3fc5bfe16e9d80539be4b6b518d2fe..49634252fd4652d0fb7f55f579a3f8fd3ca35dd3 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -61,9 +61,12 @@ class PySparkStreamingTestCase(unittest.TestCase): def tearDownClass(cls): cls.sc.stop() # Clean up in the JVM just in case there has been some issues in Python API - jSparkContextOption = SparkContext._jvm.SparkContext.get() - if jSparkContextOption.nonEmpty(): - jSparkContextOption.get().stop() + try: + jSparkContextOption = SparkContext._jvm.SparkContext.get() + if jSparkContextOption.nonEmpty(): + jSparkContextOption.get().stop() + except: + pass def setUp(self): self.ssc = StreamingContext(self.sc, self.duration) @@ -72,9 +75,12 @@ class PySparkStreamingTestCase(unittest.TestCase): if self.ssc is not None: self.ssc.stop(False) # Clean up in the JVM just in case there has been some issues in Python API - jStreamingContextOption = StreamingContext._jvm.SparkContext.getActive() - if jStreamingContextOption.nonEmpty(): - jStreamingContextOption.get().stop(False) + try: + jStreamingContextOption = StreamingContext._jvm.SparkContext.getActive() + if jStreamingContextOption.nonEmpty(): + jStreamingContextOption.get().stop(False) + except: + pass def wait_for(self, result, n): start_time = time.time() diff --git a/sbin/spark-config.sh b/sbin/spark-config.sh index b0361d72d3f2c8979676b5fe0536aa90c870bd1c..e6bf544c14799615b3d9e1aa60ce3d56a5951eee 100755 --- a/sbin/spark-config.sh +++ b/sbin/spark-config.sh @@ -36,4 +36,4 @@ export SPARK_HOME="${SPARK_PREFIX}" export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"$SPARK_HOME/conf"}" # Add the PySpark classes to the PYTHONPATH: export PYTHONPATH="$SPARK_HOME/python:$PYTHONPATH" -export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH" +export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH" diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 08aecfa7f6fe0e1d09f527ba3ff22c2f1688879f..754215db2a900b1c07ccc68f7727efca0ac50bdd 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1008,9 +1008,9 @@ private[spark] class Client( val pyArchivesFile = new File(pyLibPath, "pyspark.zip") require(pyArchivesFile.exists(), "pyspark.zip not found; cannot run pyspark application in YARN mode.") - val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip") + val py4jFile = new File(pyLibPath, "py4j-0.9-src.zip") require(py4jFile.exists(), - "py4j-0.8.2.1-src.zip not found; cannot run pyspark application in YARN mode.") + "py4j-0.9-src.zip not found; cannot run pyspark application in YARN mode.") Seq(pyArchivesFile.getAbsolutePath(), py4jFile.getAbsolutePath()) } } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index d1cd0c89b5d3826ad05b776ad05ec4f87f7e3a79..6db012a77a9369275afefb8e943cdec6e0563296 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -153,7 +153,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { // needed locations. val sparkHome = sys.props("spark.test.home"); val pythonPath = Seq( - s"$sparkHome/python/lib/py4j-0.8.2.1-src.zip", + s"$sparkHome/python/lib/py4j-0.9-src.zip", s"$sparkHome/python") val extraEnv = Map( "PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator),