diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 7fed8cb0080c52828a7a70868b42206fb31a1b01..f3e5a21d77733aa0485deed3a66f344f58b6afe1 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -106,7 +106,14 @@ object MimaExcludes {
       ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_="),
 
       // [SPARK-18236] Reduce duplicate objects in Spark UI and HistoryServer
-      ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables")
+      ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables"),
+
+      // [SPARK-18694] Add StreamingQuery.explain and exception to Python and fix StreamingQueryException
+      ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryException$"),
+      ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.startOffset"),
+      ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.endOffset"),
+      ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.this"),
+      ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.query")
     )
   }
 
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 84f01d3d9ac0b0a1164603c6cfad5fa2ab7cc078..4a7d17ba51a7b5032c8aa8a0e1d4871164a1a830 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -30,6 +30,7 @@ from pyspark import since, keyword_only
 from pyspark.rdd import ignore_unicode_prefix
 from pyspark.sql.readwriter import OptionUtils, to_str
 from pyspark.sql.types import *
+from pyspark.sql.utils import StreamingQueryException
 
 __all__ = ["StreamingQuery", "StreamingQueryManager", "DataStreamReader", "DataStreamWriter"]
 
@@ -132,6 +133,45 @@ class StreamingQuery(object):
         """
         self._jsq.stop()
 
+    @since(2.1)
+    def explain(self, extended=False):
+        """Prints the (logical and physical) plans to the console for debugging purpose.
+
+        :param extended: boolean, default ``False``. If ``False``, prints only the physical plan.
+
+        >>> sq = sdf.writeStream.format('memory').queryName('query_explain').start()
+        >>> sq.processAllAvailable() # Wait a bit to generate the runtime plans.
+        >>> sq.explain()
+        == Physical Plan ==
+        ...
+        >>> sq.explain(True)
+        == Parsed Logical Plan ==
+        ...
+        == Analyzed Logical Plan ==
+        ...
+        == Optimized Logical Plan ==
+        ...
+        == Physical Plan ==
+        ...
+        >>> sq.stop()
+        """
+        # Cannot call `_jsq.explain(...)` because it will print in the JVM process.
+        # We should print it in the Python process.
+        print(self._jsq.explainInternal(extended))
+
+    @since(2.1)
+    def exception(self):
+        """
+        :return: the StreamingQueryException if the query was terminated by an exception, or None.
+        """
+        if self._jsq.exception().isDefined():
+            je = self._jsq.exception().get()
+            msg = je.toString().split(': ', 1)[1]  # Drop the Java StreamingQueryException type info
+            stackTrace = '\n\t at '.join(map(lambda x: x.toString(), je.getStackTrace()))
+            return StreamingQueryException(msg, stackTrace)
+        else:
+            return None
+
 
 class StreamingQueryManager(object):
     """A class to manage all the :class:`StreamingQuery` StreamingQueries active.
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 0aff9cebe91bdd26cf7eff3a2e56db0be74bbd11..9f34414f64d1b1d0157640c82dfebc00c024bdab 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1137,6 +1137,35 @@ class SQLTests(ReusedPySparkTestCase):
             q.stop()
             shutil.rmtree(tmpPath)
 
+    def test_stream_exception(self):
+        sdf = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
+        sq = sdf.writeStream.format('memory').queryName('query_explain').start()
+        try:
+            sq.processAllAvailable()
+            self.assertEqual(sq.exception(), None)
+        finally:
+            sq.stop()
+
+        from pyspark.sql.functions import col, udf
+        from pyspark.sql.utils import StreamingQueryException
+        bad_udf = udf(lambda x: 1 / 0)
+        sq = sdf.select(bad_udf(col("value")))\
+            .writeStream\
+            .format('memory')\
+            .queryName('this_query')\
+            .start()
+        try:
+            # Process some data to fail the query
+            sq.processAllAvailable()
+            self.fail("bad udf should fail the query")
+        except StreamingQueryException as e:
+            # This is expected
+            self.assertTrue("ZeroDivisionError" in e.desc)
+        finally:
+            sq.stop()
+        self.assertTrue(type(sq.exception()) is StreamingQueryException)
+        self.assertTrue("ZeroDivisionError" in sq.exception().desc)
+
     def test_query_manager_await_termination(self):
         df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
         for q in self.spark._wrapped.streams.active:
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 8804c647a75c38c3206f133852f1583c3943d3c0..6b1c01ab2a0618a75513e5797d1a9d627b444d2e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -93,7 +93,7 @@ class StreamExecution(
    * once, since the field's value may change at any time.
    */
   @volatile
-  protected var availableOffsets = new StreamProgress
+  var availableOffsets = new StreamProgress
 
   /** The current batchId or -1 if execution has not yet been initialized. */
   protected var currentBatchId: Long = -1
@@ -263,7 +263,8 @@ class StreamExecution(
           this,
           s"Query $name terminated with exception: ${e.getMessage}",
           e,
-          Some(committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json)))
+          committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString,
+          availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString)
         logError(s"Query $name terminated with error", e)
         updateStatusMessage(s"Terminated with exception: ${e.getMessage}")
         // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
index 13f11ba1c92222429306b85b20dacf1a0c5e5b56..a96150aa899206298707e4c4db75b4c6ebf579d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -24,32 +24,42 @@ import org.apache.spark.sql.execution.streaming.{Offset, OffsetSeq, StreamExecut
  * :: Experimental ::
  * Exception that stopped a [[StreamingQuery]]. Use `cause` get the actual exception
  * that caused the failure.
- * @param query       Query that caused the exception
  * @param message     Message of this exception
  * @param cause       Internal cause of this exception
- * @param startOffset Starting offset (if known) of the range of data in which exception occurred
- * @param endOffset   Ending offset (if known) of the range of data in exception occurred
+ * @param startOffset Starting offset in json of the range of data in which exception occurred
+ * @param endOffset   Ending offset in json of the range of data in exception occurred
  * @since 2.0.0
  */
 @Experimental
-class StreamingQueryException private[sql](
-    @transient val query: StreamingQuery,
+class StreamingQueryException private(
+    causeString: String,
     val message: String,
     val cause: Throwable,
-    val startOffset: Option[OffsetSeq] = None,
-    val endOffset: Option[OffsetSeq] = None)
+    val startOffset: String,
+    val endOffset: String)
   extends Exception(message, cause) {
 
+  private[sql] def this(
+      query: StreamingQuery,
+      message: String,
+      cause: Throwable,
+      startOffset: String,
+      endOffset: String) {
+    this(
+      // scalastyle:off
+      s"""${classOf[StreamingQueryException].getName}: ${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}
+         |
+         |${query.asInstanceOf[StreamExecution].toDebugString}
+         """.stripMargin,
+      // scalastyle:on
+      message,
+      cause,
+      startOffset,
+      endOffset)
+  }
+
   /** Time when the exception occurred */
   val time: Long = System.currentTimeMillis
 
-  override def toString(): String = {
-    val causeStr =
-      s"${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}"
-    s"""
-       |$causeStr
-       |
-       |${query.asInstanceOf[StreamExecution].toDebugString}
-       """.stripMargin
-  }
+  override def toString(): String = causeString
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 4c8247458fcfe60e9de35037b8304719dae9a022..fb5bad0123817397027e01a05162bc2adcdeb97a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -38,6 +38,13 @@ import org.apache.spark.annotation.Experimental
 class StateOperatorProgress private[sql](
     val numRowsTotal: Long,
     val numRowsUpdated: Long) {
+
+  /** The compact JSON representation of this progress. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this progress. */
+  def prettyJson: String = pretty(render(jsonValue))
+
   private[sql] def jsonValue: JValue = {
     ("numRowsTotal" -> JInt(numRowsTotal)) ~
     ("numRowsUpdated" -> JInt(numRowsUpdated))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index a2629f7f6816062d3ef7a908a2a36b2ecf060438..43322651296b9e8cb405b5e7bc21fa7e46d716c5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -412,8 +412,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
               eventually("microbatch thread not stopped after termination with failure") {
                 assert(!currentStream.microBatchThread.isAlive)
               }
-              verify(thrownException.query.eq(currentStream),
-                s"incorrect query reference in exception")
               verify(currentStream.exception === Some(thrownException),
                 s"incorrect exception returned by query.exception()")
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 56abe1201c0cc08dfcc8204676ef2ce1cab71c9b..f7fc19494d097913d58bdb304cd49a0fc06fe94c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -103,10 +103,12 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
       TestAwaitTermination(ExpectException[SparkException]),
       TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000),
       TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10),
-      AssertOnQuery(
-        q => q.exception.get.startOffset.get.offsets ===
-          q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").offsets,
-        "incorrect start offset on exception")
+      AssertOnQuery(q => {
+        q.exception.get.startOffset ===
+          q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").toString &&
+          q.exception.get.endOffset ===
+            q.availableOffsets.toOffsetSeq(Seq(inputData), "{}").toString
+      }, "incorrect start offset or end offset on exception")
     )
   }