From c9f075abb1bb3c4773a21bc3b08253abb8b85b3f Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin <vanzin@cloudera.com> Date: Wed, 25 Jan 2017 12:08:08 -0800 Subject: [PATCH] [SPARK-19307][PYSPARK] Make sure user conf is propagated to SparkContext. The code was failing to propagate the user conf in the case where the JVM was already initialized, which happens when a user submits a python script via spark-submit. Tested with new unit test and by running a python script in a real cluster. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #16682 from vanzin/SPARK-19307. (cherry picked from commit 92afaa93a0b67f561a790822ccdd2b814455edcc) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> --- python/pyspark/context.py | 3 +++ python/pyspark/tests.py | 20 ++++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 5c4e79cb04..ac4b2b035f 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -132,6 +132,9 @@ class SparkContext(object): self._conf = conf else: self._conf = SparkConf(_jvm=SparkContext._jvm) + if conf is not None: + for k, v in conf.getAll(): + self._conf.set(k, v) self._batchSize = batchSize # -1 represents an unlimited batch size self._unbatched_serializer = serializer diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index fe314c54a1..8e35a4ee8e 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1970,6 +1970,26 @@ class SparkSubmitTests(unittest.TestCase): self.assertEqual(0, proc.returncode) self.assertIn("[2, 4, 6]", out.decode('utf-8')) + def test_user_configuration(self): + """Make sure user configuration is respected (SPARK-19307)""" + script = self.createTempFile("test.py", """ + |from pyspark import SparkConf, SparkContext + | + |conf = SparkConf().set("spark.test_config", "1") + |sc = SparkContext(conf = conf) + |try: + | if sc._conf.get("spark.test_config") != "1": + | raise Exception("Cannot find spark.test_config in SparkContext's conf.") + |finally: + | sc.stop() + """) + proc = subprocess.Popen( + [self.sparkSubmit, "--master", "local", script], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + out, err = proc.communicate() + self.assertEqual(0, proc.returncode, msg="Process failed with error:\n {0}".format(out)) + class ContextTests(unittest.TestCase): -- GitLab