diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 52dbbc830af64c79c9bb39e1acde6db934cd2515..b816072cb8c83044050b00b5551751e5136f6b83 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -374,7 +374,7 @@ The "Output" is defined as what gets written out to the external storage. The ou
 
   - *Append Mode* - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.
   
-  - *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger.
+  - *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn't contain aggregations, it will be equivalent to Append mode.
 
 Note that each mode is applicable on certain types of queries. This is discussed in detail [later](#output-modes).
 
@@ -977,7 +977,7 @@ Here is the compatibility matrix.
   </tr>
   <tr>
     <td colspan="2" style="vertical-align: middle;">Queries without aggregation</td>
-    <td style="vertical-align: middle;">Append</td>
+    <td style="vertical-align: middle;">Append, Update</td>
     <td style="vertical-align: middle;">
         Complete mode not supported as it is infeasible to keep all data in the Result Table.
     </td>
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 5014299ad220f5b138989cf55930fa53af7f1c03..a10b185cd4c7b979e590e892850a0b3f973ed23e 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -665,6 +665,9 @@ class DataStreamWriter(object):
            the sink
         * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink
            every time these is some updates
+        * `update`:only the rows that were updated in the streaming DataFrame/Dataset will be
+           written to the sink every time there are some updates. If the query doesn't contain
+           aggregations, it will be equivalent to `append` mode.
 
        .. note:: Experimental.
 
@@ -768,7 +771,8 @@ class DataStreamWriter(object):
 
     @ignore_unicode_prefix
     @since(2.0)
-    def start(self, path=None, format=None, partitionBy=None, queryName=None, **options):
+    def start(self, path=None, format=None, outputMode=None, partitionBy=None, queryName=None,
+              **options):
         """Streams the contents of the :class:`DataFrame` to a data source.
 
         The data source is specified by the ``format`` and a set of ``options``.
@@ -779,15 +783,20 @@ class DataStreamWriter(object):
 
         :param path: the path in a Hadoop supported file system
         :param format: the format used to save
-
-            * ``append``: Append contents of this :class:`DataFrame` to existing data.
-            * ``overwrite``: Overwrite existing data.
-            * ``ignore``: Silently ignore this operation if data already exists.
-            * ``error`` (default case): Throw an exception if data already exists.
+        :param outputMode: specifies how data of a streaming DataFrame/Dataset is written to a
+                           streaming sink.
+
+            * `append`:Only the new rows in the streaming DataFrame/Dataset will be written to the
+              sink
+            * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink
+               every time these is some updates
+            * `update`:only the rows that were updated in the streaming DataFrame/Dataset will be
+              written to the sink every time there are some updates. If the query doesn't contain
+              aggregations, it will be equivalent to `append` mode.
         :param partitionBy: names of partitioning columns
         :param queryName: unique name for the query
         :param options: All other string options. You may want to provide a `checkpointLocation`
-            for most streams, however it is not required for a `memory` stream.
+                        for most streams, however it is not required for a `memory` stream.
 
         >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
         >>> sq.isActive
@@ -798,7 +807,7 @@ class DataStreamWriter(object):
         >>> sq.isActive
         False
         >>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start(
-        ...     queryName='that_query', format='memory')
+        ...     queryName='that_query', outputMode="append", format='memory')
         >>> sq.name
         u'that_query'
         >>> sq.isActive
@@ -806,6 +815,8 @@ class DataStreamWriter(object):
         >>> sq.stop()
         """
         self.options(**options)
+        if outputMode is not None:
+            self.outputMode(outputMode)
         if partitionBy is not None:
             self.partitionBy(partitionBy)
         if format is not None:
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
index cf0579fd3625cc7a10c30471f9bded29c3d67b39..3f7cdb293e0fac1189e8c657ac1a3018b1d0ce2d 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
@@ -57,7 +57,8 @@ public class OutputMode {
 
   /**
    * OutputMode in which only the rows that were updated in the streaming DataFrame/Dataset will
-   * be written to the sink every time there are some updates.
+   * be written to the sink every time there are some updates. If the query doesn't contain
+   * aggregations, it will be equivalent to `Append` mode.
    *
    * @since 2.1.1
    */
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 053c8eb6170e913b9043ec67f7a767220c85c521..c2666b2ab91292ade73da558f49ca6ab4ecc80a9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -73,7 +73,7 @@ object UnsupportedOperationChecker {
                 s"streaming DataFrames/DataSets")(plan)
         }
 
-      case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty =>
+      case InternalOutputModes.Complete if aggregates.isEmpty =>
         throwError(
           s"$outputMode output mode not supported when there are no streaming aggregations on " +
             s"streaming DataFrames/Datasets")(plan)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala
index 915f4a9e25cecbb2b04f4e027570ba00e54bfcd5..351bd6fff4adf6d5b4fa60d5266f7ecb1121bf4d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala
@@ -40,8 +40,8 @@ private[sql] object InternalOutputModes {
 
   /**
    * OutputMode in which only the rows in the streaming DataFrame/Dataset that were updated will be
-   * written to the sink every time these is some updates. This output mode can only be used in
-   * queries that contain aggregations.
+   * written to the sink every time these is some updates. If the query doesn't contain
+   * aggregations, it will be equivalent to `Append` mode.
    */
   case object Update extends OutputMode
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index d2c0f8cc9fe8a5770ad9c5fc2015c885648dc01b..58e69f9ebea05cc3919bdfb378beea6db7897171 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -219,9 +219,9 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     "window", Window(Nil, Nil, Nil, _), expectedMsg = "non-time-based windows")
 
   // Output modes with aggregation and non-aggregation plans
-  testOutputMode(Append, shouldSupportAggregation = false)
-  testOutputMode(Update, shouldSupportAggregation = true)
-  testOutputMode(Complete, shouldSupportAggregation = true)
+  testOutputMode(Append, shouldSupportAggregation = false, shouldSupportNonAggregation = true)
+  testOutputMode(Update, shouldSupportAggregation = true, shouldSupportNonAggregation = true)
+  testOutputMode(Complete, shouldSupportAggregation = true, shouldSupportNonAggregation = false)
 
   /*
     =======================================================================================
@@ -323,30 +323,33 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
   /** Test output mode with and without aggregation in the streaming plan */
   def testOutputMode(
       outputMode: OutputMode,
-      shouldSupportAggregation: Boolean): Unit = {
+      shouldSupportAggregation: Boolean,
+      shouldSupportNonAggregation: Boolean): Unit = {
 
     // aggregation
     if (shouldSupportAggregation) {
-      assertNotSupportedInStreamingPlan(
-        s"$outputMode output mode - no aggregation",
-        streamRelation.where($"a" > 1),
-        outputMode = outputMode,
-        Seq("aggregation", s"$outputMode output mode"))
-
       assertSupportedInStreamingPlan(
         s"$outputMode output mode - aggregation",
         streamRelation.groupBy("a")("count(*)"),
         outputMode = outputMode)
-
     } else {
+      assertNotSupportedInStreamingPlan(
+        s"$outputMode output mode - aggregation",
+        streamRelation.groupBy("a")("count(*)"),
+        outputMode = outputMode,
+        Seq("aggregation", s"$outputMode output mode"))
+    }
+
+    // non aggregation
+    if (shouldSupportNonAggregation) {
       assertSupportedInStreamingPlan(
         s"$outputMode output mode - no aggregation",
         streamRelation.where($"a" > 1),
         outputMode = outputMode)
-
+    } else {
       assertNotSupportedInStreamingPlan(
-        s"$outputMode output mode - aggregation",
-        streamRelation.groupBy("a")("count(*)"),
+        s"$outputMode output mode - no aggregation",
+        streamRelation.where($"a" > 1),
         outputMode = outputMode,
         Seq("aggregation", s"$outputMode output mode"))
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index abb00ce02e91bb380f9fd03f28b940e1f7410d50..7e7a1ba22334e6467a661e133980e16b94d0c585 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -44,6 +44,10 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
    *                            written to the sink
    *   - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written
    *                              to the sink every time these is some updates
+   *   - `OutputMode.Update()`: only the rows that were updated in the streaming DataFrame/Dataset
+   *                            will be written to the sink every time there are some updates. If
+   *                            the query doesn't contain aggregations, it will be equivalent to
+   *                            `OutputMode.Append()` mode.
    *
    * @since 2.0.0
    */
@@ -58,7 +62,9 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
    *                 the sink
    *   - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink
    *                 every time these is some updates
-   *
+   *   - `update`:   only the rows that were updated in the streaming DataFrame/Dataset will
+   *                 be written to the sink every time there are some updates. If the query doesn't
+   *                 contain aggregations, it will be equivalent to `append` mode.
    * @since 2.0.0
    */
   def outputMode(outputMode: String): DataStreamWriter[T] = {
@@ -220,16 +226,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
       if (extraOptions.get("queryName").isEmpty) {
         throw new AnalysisException("queryName must be specified for memory sink")
       }
-      val supportedModes = "Output modes supported by the memory sink are 'append' and 'complete'."
-      outputMode match {
-        case Append | Complete => // allowed
-        case Update =>
-          throw new AnalysisException(
-            s"Update output mode is not supported for memory sink. $supportedModes")
-        case _ =>
-          throw new AnalysisException(
-            s"$outputMode is not supported for memory sink. $supportedModes")
-      }
       val sink = new MemorySink(df.schema, outputMode)
       val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
       val chkpointLoc = extraOptions.get("checkpointLocation")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala
index ca724fc5cc67eaaa2d256ea9be929fa3707472c6..8f23f98f76190cce2e4b2693a66fa82d3358922f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala
@@ -137,7 +137,7 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
   }
 
 
-  test("registering as a table in Append output mode - supported") {
+  test("registering as a table in Append output mode") {
     val input = MemoryStream[Int]
     val query = input.toDF().writeStream
       .format("memory")
@@ -160,7 +160,7 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
     query.stop()
   }
 
-  test("registering as a table in Complete output mode - supported") {
+  test("registering as a table in Complete output mode") {
     val input = MemoryStream[Int]
     val query = input.toDF()
       .groupBy("value")
@@ -186,18 +186,27 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
     query.stop()
   }
 
-  test("registering as a table in Update output mode - not supported") {
+  test("registering as a table in Update output mode") {
     val input = MemoryStream[Int]
-    val df = input.toDF()
-      .groupBy("value")
-      .count()
-    intercept[AnalysisException] {
-      df.writeStream
-        .format("memory")
-        .outputMode("update")
-        .queryName("memStream")
-        .start()
-    }
+    val query = input.toDF().writeStream
+      .format("memory")
+      .outputMode("update")
+      .queryName("memStream")
+      .start()
+    input.addData(1, 2, 3)
+    query.processAllAvailable()
+
+    checkDataset(
+      spark.table("memStream").as[Int],
+      1, 2, 3)
+
+    input.addData(4, 5, 6)
+    query.processAllAvailable()
+    checkDataset(
+      spark.table("memStream").as[Int],
+      1, 2, 3, 4, 5, 6)
+
+    query.stop()
   }
 
   test("MemoryPlan statistics") {