Skip to content
Snippets Groups Projects
Commit be7a2cfd authored by Shixiong Zhu's avatar Shixiong Zhu Committed by Tathagata Das
Browse files

[SPARK-11870][STREAMING][PYSPARK] Rethrow the exceptions in TransformFunction...

[SPARK-11870][STREAMING][PYSPARK] Rethrow the exceptions in TransformFunction and TransformFunctionSerializer

TransformFunction and TransformFunctionSerializer don't rethrow the exception, so when any exception happens, it just return None. This will cause some weird NPE and confuse people.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #9847 from zsxwing/pyspark-streaming-exception.
parent 9ed4ad42
No related branches found
No related tags found
No related merge requests found
...@@ -403,6 +403,22 @@ class BasicOperationTests(PySparkStreamingTestCase): ...@@ -403,6 +403,22 @@ class BasicOperationTests(PySparkStreamingTestCase):
expected = [[('k', v)] for v in expected] expected = [[('k', v)] for v in expected]
self._test_func(input, func, expected) self._test_func(input, func, expected)
def test_failed_func(self):
input = [self.sc.parallelize([d], 1) for d in range(4)]
input_stream = self.ssc.queueStream(input)
def failed_func(i):
raise ValueError("failed")
input_stream.map(failed_func).pprint()
self.ssc.start()
try:
self.ssc.awaitTerminationOrTimeout(10)
except:
return
self.fail("a failed func should throw an error")
class StreamingListenerTests(PySparkStreamingTestCase): class StreamingListenerTests(PySparkStreamingTestCase):
......
...@@ -64,6 +64,7 @@ class TransformFunction(object): ...@@ -64,6 +64,7 @@ class TransformFunction(object):
return r._jrdd return r._jrdd
except Exception: except Exception:
traceback.print_exc() traceback.print_exc()
raise
def __repr__(self): def __repr__(self):
return "TransformFunction(%s)" % self.func return "TransformFunction(%s)" % self.func
...@@ -95,6 +96,7 @@ class TransformFunctionSerializer(object): ...@@ -95,6 +96,7 @@ class TransformFunctionSerializer(object):
return bytearray(self.serializer.dumps((func.func, func.deserializers))) return bytearray(self.serializer.dumps((func.func, func.deserializers)))
except Exception: except Exception:
traceback.print_exc() traceback.print_exc()
raise
def loads(self, data): def loads(self, data):
try: try:
...@@ -102,6 +104,7 @@ class TransformFunctionSerializer(object): ...@@ -102,6 +104,7 @@ class TransformFunctionSerializer(object):
return TransformFunction(self.ctx, f, *deserializers) return TransformFunction(self.ctx, f, *deserializers)
except Exception: except Exception:
traceback.print_exc() traceback.print_exc()
raise
def __repr__(self): def __repr__(self):
return "TransformFunctionSerializer(%s)" % self.serializer return "TransformFunctionSerializer(%s)" % self.serializer
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment