Skip to content
Snippets Groups Projects
Commit 615fb649 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Fix some other Python tests due to initializing JVM in a different way

The test in context.py created two different instances of the
SparkContext class by copying "globals", so that some tests can have a
global "sc" object and others can try initializing their own contexts.
This led to two JVM gateways being created since SparkConf also looked
at pyspark.context.SparkContext to get the JVM.
parent cd00225d
No related branches found
No related tags found
No related merge requests found
...@@ -50,10 +50,11 @@ u'value1' ...@@ -50,10 +50,11 @@ u'value1'
class SparkConf(object): class SparkConf(object):
def __init__(self, loadDefaults=False): def __init__(self, loadDefaults=True, _jvm=None):
from pyspark.context import SparkContext from pyspark.context import SparkContext
SparkContext._ensure_initialized() SparkContext._ensure_initialized()
self._jconf = SparkContext._jvm.SparkConf(loadDefaults) _jvm = _jvm or SparkContext._jvm
self._jconf = _jvm.SparkConf(loadDefaults)
def set(self, key, value): def set(self, key, value):
self._jconf.set(key, value) self._jconf.set(key, value)
......
...@@ -81,7 +81,8 @@ class SparkContext(object): ...@@ -81,7 +81,8 @@ class SparkContext(object):
""" """
SparkContext._ensure_initialized(self) SparkContext._ensure_initialized(self)
self.conf = conf or SparkConf() self.environment = environment or {}
self.conf = conf or SparkConf(_jvm=self._jvm)
self._batchSize = batchSize # -1 represents an unlimited batch size self._batchSize = batchSize # -1 represents an unlimited batch size
self._unbatched_serializer = serializer self._unbatched_serializer = serializer
if batchSize == 1: if batchSize == 1:
...@@ -90,23 +91,30 @@ class SparkContext(object): ...@@ -90,23 +91,30 @@ class SparkContext(object):
self.serializer = BatchedSerializer(self._unbatched_serializer, self.serializer = BatchedSerializer(self._unbatched_serializer,
batchSize) batchSize)
# Set parameters passed directly on our conf; these operations will be no-ops # Set parameters passed directly to us on the conf; these operations will be
# if the parameters were None # no-ops if the parameters were None
self.conf.setMaster(master) self.conf.setMaster(master)
self.conf.setAppName(appName) self.conf.setAppName(appName)
self.conf.setSparkHome(sparkHome) self.conf.setSparkHome(sparkHome)
environment = environment or {} if environment:
for key, value in environment.iteritems(): for key, value in environment.iteritems():
self.conf.setExecutorEnv(key, value) self.conf.setExecutorEnv(key, value)
# Check that we have at least the required parameters
if not self.conf.contains("spark.master"): if not self.conf.contains("spark.master"):
raise Exception("A master URL must be set in your configuration") raise Exception("A master URL must be set in your configuration")
if not self.conf.contains("spark.appName"): if not self.conf.contains("spark.appName"):
raise Exception("An application name must be set in your configuration") raise Exception("An application name must be set in your configuration")
# Read back our properties from the conf in case we loaded some of them from
# the classpath or an external config file
self.master = self.conf.get("spark.master") self.master = self.conf.get("spark.master")
self.appName = self.conf.get("spark.appName") self.appName = self.conf.get("spark.appName")
self.sparkHome = self.conf.getOrElse("spark.home", None) self.sparkHome = self.conf.getOrElse("spark.home", None)
for (k, v) in self.conf.getAll():
if k.startswith("spark.executorEnv."):
varName = k[len("spark.executorEnv."):]
self.environment[varName] = v
# Create the Java SparkContext through Py4J # Create the Java SparkContext through Py4J
self._jsc = self._jvm.JavaSparkContext(self.conf._jconf) self._jsc = self._jvm.JavaSparkContext(self.conf._jconf)
...@@ -147,8 +155,7 @@ class SparkContext(object): ...@@ -147,8 +155,7 @@ class SparkContext(object):
if not SparkContext._gateway: if not SparkContext._gateway:
SparkContext._gateway = launch_gateway() SparkContext._gateway = launch_gateway()
SparkContext._jvm = SparkContext._gateway.jvm SparkContext._jvm = SparkContext._gateway.jvm
SparkContext._writeToFile = \ SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile
SparkContext._jvm.PythonRDD.writeToFile
if instance: if instance:
if SparkContext._active_spark_context and SparkContext._active_spark_context != instance: if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:
......
...@@ -35,6 +35,7 @@ function run_test() { ...@@ -35,6 +35,7 @@ function run_test() {
run_test "pyspark/rdd.py" run_test "pyspark/rdd.py"
run_test "pyspark/context.py" run_test "pyspark/context.py"
run_test "pyspark/conf.py"
run_test "-m doctest pyspark/broadcast.py" run_test "-m doctest pyspark/broadcast.py"
run_test "-m doctest pyspark/accumulators.py" run_test "-m doctest pyspark/accumulators.py"
run_test "-m doctest pyspark/serializers.py" run_test "-m doctest pyspark/serializers.py"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment