Skip to content
Snippets Groups Projects
Commit 0f90d605 authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-9640] [STREAMING] [TEST] Do not run Python Kinesis tests when the...

[SPARK-9640] [STREAMING] [TEST] Do not run Python Kinesis tests when the Kinesis assembly JAR has not been generated

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #7961 from tdas/SPARK-9640 and squashes the following commits:

974ce19 [Tathagata Das] Undo changes related to SPARK-9727
004ae26 [Tathagata Das] style fixes
9bbb97d [Tathagata Das] Minor style fies
e6a677e [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-9640
ca90719 [Tathagata Das] Removed extra line
ba9cfc7 [Tathagata Das] Improved kinesis test selection logic
88d59bd [Tathagata Das] updated test modules
871fcc8 [Tathagata Das] Fixed SparkBuild
94be631 [Tathagata Das] Fixed style
b858196 [Tathagata Das] Fixed conditions and few other things based on PR comments.
e292e64 [Tathagata Das] Added filters for Kinesis python tests
parent 91e9389f
No related branches found
No related tags found
No related merge requests found
...@@ -971,8 +971,10 @@ class KinesisStreamTests(PySparkStreamingTestCase): ...@@ -971,8 +971,10 @@ class KinesisStreamTests(PySparkStreamingTestCase):
"awsAccessKey", "awsSecretKey") "awsAccessKey", "awsSecretKey")
def test_kinesis_stream(self): def test_kinesis_stream(self):
if os.environ.get('ENABLE_KINESIS_TESTS') != '1': if not are_kinesis_tests_enabled:
print("Skip test_kinesis_stream") sys.stderr.write(
"Skipped test_kinesis_stream (enable by setting environment variable %s=1"
% kinesis_test_environ_var)
return return
import random import random
...@@ -1013,6 +1015,7 @@ class KinesisStreamTests(PySparkStreamingTestCase): ...@@ -1013,6 +1015,7 @@ class KinesisStreamTests(PySparkStreamingTestCase):
traceback.print_exc() traceback.print_exc()
raise raise
finally: finally:
self.ssc.stop(False)
kinesisTestUtils.deleteStream() kinesisTestUtils.deleteStream()
kinesisTestUtils.deleteDynamoDBTable(kinesisAppName) kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
...@@ -1027,7 +1030,7 @@ def search_kafka_assembly_jar(): ...@@ -1027,7 +1030,7 @@ def search_kafka_assembly_jar():
("Failed to find Spark Streaming kafka assembly jar in %s. " % kafka_assembly_dir) + ("Failed to find Spark Streaming kafka assembly jar in %s. " % kafka_assembly_dir) +
"You need to build Spark with " "You need to build Spark with "
"'build/sbt assembly/assembly streaming-kafka-assembly/assembly' or " "'build/sbt assembly/assembly streaming-kafka-assembly/assembly' or "
"'build/mvn package' before running this test") "'build/mvn package' before running this test.")
elif len(jars) > 1: elif len(jars) > 1:
raise Exception(("Found multiple Spark Streaming Kafka assembly JARs in %s; please " raise Exception(("Found multiple Spark Streaming Kafka assembly JARs in %s; please "
"remove all but one") % kafka_assembly_dir) "remove all but one") % kafka_assembly_dir)
...@@ -1045,7 +1048,7 @@ def search_flume_assembly_jar(): ...@@ -1045,7 +1048,7 @@ def search_flume_assembly_jar():
("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) + ("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) +
"You need to build Spark with " "You need to build Spark with "
"'build/sbt assembly/assembly streaming-flume-assembly/assembly' or " "'build/sbt assembly/assembly streaming-flume-assembly/assembly' or "
"'build/mvn package' before running this test") "'build/mvn package' before running this test.")
elif len(jars) > 1: elif len(jars) > 1:
raise Exception(("Found multiple Spark Streaming Flume assembly JARs in %s; please " raise Exception(("Found multiple Spark Streaming Flume assembly JARs in %s; please "
"remove all but one") % flume_assembly_dir) "remove all but one") % flume_assembly_dir)
...@@ -1095,11 +1098,7 @@ def search_kinesis_asl_assembly_jar(): ...@@ -1095,11 +1098,7 @@ def search_kinesis_asl_assembly_jar():
os.path.join(kinesis_asl_assembly_dir, os.path.join(kinesis_asl_assembly_dir,
"target/scala-*/spark-streaming-kinesis-asl-assembly-*.jar")) "target/scala-*/spark-streaming-kinesis-asl-assembly-*.jar"))
if not jars: if not jars:
raise Exception( return None
("Failed to find Spark Streaming Kinesis ASL assembly jar in %s. " %
kinesis_asl_assembly_dir) + "You need to build Spark with "
"'build/sbt -Pkinesis-asl assembly/assembly streaming-kinesis-asl-assembly/assembly' "
"or 'build/mvn -Pkinesis-asl package' before running this test")
elif len(jars) > 1: elif len(jars) > 1:
raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs in %s; please " raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs in %s; please "
"remove all but one") % kinesis_asl_assembly_dir) "remove all but one") % kinesis_asl_assembly_dir)
...@@ -1107,6 +1106,10 @@ def search_kinesis_asl_assembly_jar(): ...@@ -1107,6 +1106,10 @@ def search_kinesis_asl_assembly_jar():
return jars[0] return jars[0]
# Must be same as the variable and condition defined in KinesisTestUtils.scala
kinesis_test_environ_var = "ENABLE_KINESIS_TESTS"
are_kinesis_tests_enabled = os.environ.get(kinesis_test_environ_var) == '1'
if __name__ == "__main__": if __name__ == "__main__":
kafka_assembly_jar = search_kafka_assembly_jar() kafka_assembly_jar = search_kafka_assembly_jar()
flume_assembly_jar = search_flume_assembly_jar() flume_assembly_jar = search_flume_assembly_jar()
...@@ -1114,8 +1117,37 @@ if __name__ == "__main__": ...@@ -1114,8 +1117,37 @@ if __name__ == "__main__":
mqtt_test_jar = search_mqtt_test_jar() mqtt_test_jar = search_mqtt_test_jar()
kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar() kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar()
jars = "%s,%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, kinesis_asl_assembly_jar, if kinesis_asl_assembly_jar is None:
mqtt_assembly_jar, mqtt_test_jar) kinesis_jar_present = False
jars = "%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, mqtt_assembly_jar,
mqtt_test_jar)
else:
kinesis_jar_present = True
jars = "%s,%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, mqtt_assembly_jar,
mqtt_test_jar, kinesis_asl_assembly_jar)
os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
unittest.main() testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests,
CheckpointTests, KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests]
if kinesis_jar_present is True:
testcases.append(KinesisStreamTests)
elif are_kinesis_tests_enabled is False:
sys.stderr.write("Skipping all Kinesis Python tests as the optional Kinesis project was "
"not compiled with -Pkinesis-asl profile. To run these tests, "
"you need to build Spark with 'build/sbt -Pkinesis-asl assembly/assembly "
"streaming-kinesis-asl-assembly/assembly' or "
"'build/mvn -Pkinesis-asl package' before running this test.")
else:
raise Exception(
("Failed to find Spark Streaming Kinesis assembly jar in %s. "
% kinesis_asl_assembly_dir) +
"You need to build Spark with 'build/sbt -Pkinesis-asl "
"assembly/assembly streaming-kinesis-asl-assembly/assembly'"
"or 'build/mvn -Pkinesis-asl package' before running this test.")
sys.stderr.write("Running tests: %s \n" % (str(testcases)))
for testcase in testcases:
sys.stderr.write("[Running %s]\n" % (testcase))
tests = unittest.TestLoader().loadTestsFromTestCase(testcase)
unittest.TextTestRunner(verbosity=2).run(tests)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment