diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 8e2364d2f71cb55af81bd34190f3c82329031e69..ce5725764be6dc34299adebb90bc236981f753fd 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -337,6 +337,7 @@ pyspark_sql = Module( "pyspark.sql.group", "pyspark.sql.functions", "pyspark.sql.readwriter", + "pyspark.sql.streaming", "pyspark.sql.window", "pyspark.sql.tests", ] diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 486733a390a0c6c3d79c59e56cb47b8477067b9c..60f62b219b217c40cd3287047143cc24972267ef 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -433,6 +433,8 @@ class SQLContext(object): def streams(self): """Returns a :class:`ContinuousQueryManager` that allows managing all the :class:`ContinuousQuery` ContinuousQueries active on `this` context. + + .. note:: Experimental. """ from pyspark.sql.streaming import ContinuousQueryManager return ContinuousQueryManager(self._ssql_ctx.streams()) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 7c9f532f9412f22ed63a5a85ca6a190b0932c7f9..f0bf0923b8c759d6170f77104799a2b97995e00b 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -549,6 +549,17 @@ class SparkSession(object): """ return DataFrameReader(self._wrapped) + @property + @since(2.0) + def streams(self): + """Returns a :class:`ContinuousQueryManager` that allows managing all the + :class:`ContinuousQuery` ContinuousQueries active on `this` context. + + .. note:: Experimental. + """ + from pyspark.sql.streaming import ContinuousQueryManager + return ContinuousQueryManager(self._jsparkSession.streams()) + @since(2.0) def stop(self): """Stop the underlying :class:`SparkContext`. diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 580aba651f1b0780a33b8a375a1dc9dd06abb1ef..bb4e62cdd6a5682e99c692d7d36040e7b723ceee 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -15,6 +15,12 @@ # limitations under the License. # +import sys +if sys.version >= '3': + intlike = int +else: + intlike = (int, long) + from abc import ABCMeta, abstractmethod from pyspark import since @@ -36,10 +42,18 @@ class ContinuousQuery(object): def __init__(self, jcq): self._jcq = jcq + @property + @since(2.0) + def id(self): + """The id of the continuous query. This id is unique across all queries that have been + started in the current process. + """ + return self._jcq.id() + @property @since(2.0) def name(self): - """The name of the continuous query. + """The name of the continuous query. This name is unique across all active queries. """ return self._jcq.name() @@ -106,7 +120,7 @@ class ContinuousQueryManager(object): """Returns a list of active queries associated with this SQLContext >>> cq = df.write.format('memory').queryName('this_query').startStream() - >>> cqm = sqlContext.streams + >>> cqm = spark.streams >>> # get the list of active continuous queries >>> [q.name for q in cqm.active] [u'this_query'] @@ -114,20 +128,26 @@ class ContinuousQueryManager(object): """ return [ContinuousQuery(jcq) for jcq in self._jcqm.active()] + @ignore_unicode_prefix @since(2.0) - def get(self, name): + def get(self, id): """Returns an active query from this SQLContext or throws exception if an active query with this name doesn't exist. - >>> df.write.format('memory').queryName('this_query').startStream() - >>> cq = sqlContext.streams.get('this_query') + >>> cq = df.write.format('memory').queryName('this_query').startStream() + >>> cq.name + u'this_query' + >>> cq = spark.streams.get(cq.id) + >>> cq.isActive + True + >>> cq = sqlContext.streams.get(cq.id) >>> cq.isActive True >>> cq.stop() """ - if type(name) != str or len(name.strip()) == 0: - raise ValueError("The name for the query must be a non-empty string. Got: %s" % name) - return ContinuousQuery(self._jcqm.get(name)) + if not isinstance(id, intlike): + raise ValueError("The id for the query must be an integer. Got: %d" % id) + return ContinuousQuery(self._jcqm.get(id)) @since(2.0) def awaitAnyTermination(self, timeout=None): @@ -162,7 +182,7 @@ class ContinuousQueryManager(object): """Forget about past terminated queries so that :func:`awaitAnyTermination()` can be used again to wait for new terminations. - >>> sqlContext.streams.resetTerminated() + >>> spark.streams.resetTerminated() """ self._jcqm.resetTerminated() @@ -209,27 +229,28 @@ def _test(): import doctest import os import tempfile - from pyspark.context import SparkContext - from pyspark.sql import Row, SQLContext, HiveContext - import pyspark.sql.readwriter + from pyspark.sql import Row, SparkSession, SQLContext + import pyspark.sql.streaming os.chdir(os.environ["SPARK_HOME"]) - globs = pyspark.sql.readwriter.__dict__.copy() - sc = SparkContext('local[4]', 'PythonTest') + globs = pyspark.sql.streaming.__dict__.copy() + try: + spark = SparkSession.builder.enableHiveSupport().getOrCreate() + except py4j.protocol.Py4JError: + spark = SparkSession(sc) globs['tempfile'] = tempfile globs['os'] = os - globs['sc'] = sc - globs['sqlContext'] = SQLContext(sc) - globs['hiveContext'] = HiveContext._createForTesting(sc) + globs['spark'] = spark + globs['sqlContext'] = SQLContext.getOrCreate(spark.sparkContext) globs['df'] = \ - globs['sqlContext'].read.format('text').stream('python/test_support/sql/streaming') + globs['spark'].read.format('text').stream('python/test_support/sql/streaming') (failure_count, test_count) = doctest.testmod( - pyspark.sql.readwriter, globs=globs, + pyspark.sql.streaming, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF) - globs['sc'].stop() + globs['spark'].stop() if failure_count: exit(-1)