diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/OutputMode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/OutputMode.scala
new file mode 100644
index 0000000000000000000000000000000000000000..a4d387eae3c80908ad29dd6e670d50f9da455fef
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/OutputMode.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+sealed trait OutputMode
+
+case object Append extends OutputMode
+case object Update extends OutputMode
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
new file mode 100644
index 0000000000000000000000000000000000000000..aadc1d31bd4b254999fa8b37705ac41b265fa123
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+
+/**
+ * Analyzes the presence of unsupported operations in a logical plan.
+ */
+object UnsupportedOperationChecker {
+
+  def checkForBatch(plan: LogicalPlan): Unit = {
+    plan.foreachUp {
+      case p if p.isStreaming =>
+        throwError(
+          "Queries with streaming sources must be executed with write.startStream()")(p)
+
+      case _ =>
+    }
+  }
+
+  def checkForStreaming(plan: LogicalPlan, outputMode: OutputMode): Unit = {
+
+    if (!plan.isStreaming) {
+      throwError(
+        "Queries without streaming sources cannot be executed with write.startStream()")(plan)
+    }
+
+    plan.foreachUp { implicit plan =>
+
+      // Operations that cannot exists anywhere in a streaming plan
+      plan match {
+
+        case _: Command =>
+          throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
+            "streaming DataFrames/Datasets")
+
+        case _: InsertIntoTable =>
+          throwError("InsertIntoTable is not supported with streaming DataFrames/Datasets")
+
+        case Aggregate(_, _, child) if child.isStreaming && outputMode == Append =>
+          throwError(
+            "Aggregations are not supported on streaming DataFrames/Datasets in " +
+              "Append output mode. Consider changing output mode to Update.")
+
+        case Join(left, right, joinType, _) =>
+
+          joinType match {
+
+            case Inner =>
+              if (left.isStreaming && right.isStreaming) {
+                throwError("Inner join between two streaming DataFrames/Datasets is not supported")
+              }
+
+            case FullOuter =>
+              if (left.isStreaming || right.isStreaming) {
+                throwError("Full outer joins with streaming DataFrames/Datasets are not supported")
+              }
+
+
+            case LeftOuter | LeftSemi | LeftAnti =>
+              if (right.isStreaming) {
+                throwError("Left outer/semi/anti joins with a streaming DataFrame/Dataset " +
+                    "on the right is not supported")
+              }
+
+            case RightOuter =>
+              if (left.isStreaming) {
+                throwError("Right outer join with a streaming DataFrame/Dataset on the left is " +
+                    "not supported")
+              }
+
+            case NaturalJoin(_) | UsingJoin(_, _) =>
+              // They should not appear in an analyzed plan.
+
+            case _ =>
+              throwError(s"Join type $joinType is not supported with streaming DataFrame/Dataset")
+          }
+
+        case c: CoGroup if c.children.exists(_.isStreaming) =>
+          throwError("CoGrouping with a streaming DataFrame/Dataset is not supported")
+
+        case u: Union if u.children.map(_.isStreaming).distinct.size == 2 =>
+          throwError("Union between streaming and batch DataFrames/Datasets is not supported")
+
+        case Except(left, right) if right.isStreaming =>
+          throwError("Except with a streaming DataFrame/Dataset on the right is not supported")
+
+        case Intersect(left, right) if left.isStreaming && right.isStreaming =>
+          throwError("Intersect between two streaming DataFrames/Datasets is not supported")
+
+        case GroupingSets(_, _, child, _) if child.isStreaming =>
+          throwError("GroupingSets is not supported on streaming DataFrames/Datasets")
+
+        case GlobalLimit(_, _) | LocalLimit(_, _) if plan.children.forall(_.isStreaming) =>
+          throwError("Limits are not supported on streaming DataFrames/Datasets")
+
+        case Sort(_, _, _) | SortPartitions(_, _) if plan.children.forall(_.isStreaming) =>
+          throwError("Sorting is not supported on streaming DataFrames/Datasets")
+
+        case Sample(_, _, _, _, child) if child.isStreaming =>
+          throwError("Sampling is not supported on streaming DataFrames/Datasets")
+
+        case Window(_, _, _, child) if child.isStreaming =>
+          throwError("Non-time-based windows are not supported on streaming DataFrames/Datasets")
+
+        case ReturnAnswer(child) if child.isStreaming =>
+          throwError("Cannot return immediate result on streaming DataFrames/Dataset. Queries " +
+            "with streaming DataFrames/Datasets must be executed with write.startStream().")
+
+        case _ =>
+      }
+    }
+  }
+
+  private def throwErrorIf(
+      condition: Boolean,
+      msg: String)(implicit operator: LogicalPlan): Unit = {
+    if (condition) {
+      throwError(msg)
+    }
+  }
+
+  private def throwError(msg: String)(implicit operator: LogicalPlan): Nothing = {
+    throw new AnalysisException(
+      msg, operator.origin.line, operator.origin.startPosition, Some(operator))
+  }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 1e7296664bb2580a12f626fd98cfbd97416f8fd5..958966328bbe20a627f17f7fc50ae470c4499284 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -297,6 +297,24 @@ package object dsl {
         condition: Option[Expression] = None): LogicalPlan =
         Join(logicalPlan, otherPlan, joinType, condition)
 
+      def cogroup[Key: Encoder, Left: Encoder, Right: Encoder, Result: Encoder](
+          otherPlan: LogicalPlan,
+          func: (Key, Iterator[Left], Iterator[Right]) => TraversableOnce[Result],
+          leftGroup: Seq[Attribute],
+          rightGroup: Seq[Attribute],
+          leftAttr: Seq[Attribute],
+          rightAttr: Seq[Attribute]
+        ): LogicalPlan = {
+        CoGroup.apply[Key, Left, Right, Result](
+          func,
+          leftGroup,
+          rightGroup,
+          leftAttr,
+          rightAttr,
+          logicalPlan,
+          otherPlan)
+      }
+
       def orderBy(sortExprs: SortOrder*): LogicalPlan = Sort(sortExprs, true, logicalPlan)
 
       def sortBy(sortExprs: SortOrder*): LogicalPlan = Sort(sortExprs, false, logicalPlan)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index aceeb8aadcf68eff22162d72ce59347c9b29010a..45ac126a72f5c4829663577f35432c3e77dc8c34 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -42,6 +42,9 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
    */
   def analyzed: Boolean = _analyzed
 
+  /** Returns true if this subtree contains any streaming data sources. */
+  def isStreaming: Boolean = children.exists(_.isStreaming == true)
+
   /**
    * Returns a copy of this node where `rule` has been recursively applied first to all of its
    * children and then itself (post-order). When `rule` does not apply to a given node, it is left
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..ce00a03e764fd1665ac33df13040e729197c4a2c
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types.IntegerType
+
+class UnsupportedOperationsSuite extends SparkFunSuite {
+
+  val attribute = AttributeReference("a", IntegerType, nullable = true)()
+  val batchRelation = LocalRelation(attribute)
+  val streamRelation = new TestStreamingRelation(attribute)
+
+  /*
+    =======================================================================================
+                                     BATCH QUERIES
+    =======================================================================================
+   */
+
+  assertSupportedInBatchPlan("local relation", batchRelation)
+
+  assertNotSupportedInBatchPlan(
+    "streaming source",
+    streamRelation,
+    Seq("with streaming source", "startStream"))
+
+  assertNotSupportedInBatchPlan(
+    "select on streaming source",
+    streamRelation.select($"count(*)"),
+    Seq("with streaming source", "startStream"))
+
+
+  /*
+    =======================================================================================
+                                     STREAMING QUERIES
+    =======================================================================================
+   */
+
+  // Batch plan in streaming query
+  testError(
+    "streaming plan - no streaming source",
+    Seq("without streaming source", "startStream")) {
+    UnsupportedOperationChecker.checkForStreaming(batchRelation.select($"count(*)"), Append)
+  }
+
+  // Commands
+  assertNotSupportedInStreamingPlan(
+    "commmands",
+    DescribeFunction("func", true),
+    outputMode = Append,
+    expectedMsgs = "commands" :: Nil)
+
+  // Aggregates: Not supported on streams in Append mode
+  assertSupportedInStreamingPlan(
+    "aggregate - batch with update output mode",
+    batchRelation.groupBy("a")("count(*)"),
+    outputMode = Update)
+
+  assertSupportedInStreamingPlan(
+    "aggregate - batch with append output mode",
+    batchRelation.groupBy("a")("count(*)"),
+    outputMode = Append)
+
+  assertSupportedInStreamingPlan(
+    "aggregate - stream with update output mode",
+    streamRelation.groupBy("a")("count(*)"),
+    outputMode = Update)
+
+  assertNotSupportedInStreamingPlan(
+    "aggregate - stream with append output mode",
+    streamRelation.groupBy("a")("count(*)"),
+    outputMode = Append,
+    Seq("aggregation", "append output mode"))
+
+  // Inner joins: Stream-stream not supported
+  testBinaryOperationInStreamingPlan(
+    "inner join",
+    _.join(_, joinType = Inner),
+    streamStreamSupported = false)
+
+  // Full outer joins: only batch-batch is allowed
+  testBinaryOperationInStreamingPlan(
+    "full outer join",
+    _.join(_, joinType = FullOuter),
+    streamStreamSupported = false,
+    batchStreamSupported = false,
+    streamBatchSupported = false)
+
+  // Left outer joins: *-stream not allowed
+  testBinaryOperationInStreamingPlan(
+    "left outer join",
+    _.join(_, joinType = LeftOuter),
+    streamStreamSupported = false,
+    batchStreamSupported = false,
+    expectedMsg = "left outer/semi/anti joins")
+
+  // Left semi joins: stream-* not allowed
+  testBinaryOperationInStreamingPlan(
+    "left semi join",
+    _.join(_, joinType = LeftSemi),
+    streamStreamSupported = false,
+    batchStreamSupported = false,
+    expectedMsg = "left outer/semi/anti joins")
+
+  // Left anti joins: stream-* not allowed
+  testBinaryOperationInStreamingPlan(
+    "left anti join",
+    _.join(_, joinType = LeftAnti),
+    streamStreamSupported = false,
+    batchStreamSupported = false,
+    expectedMsg = "left outer/semi/anti joins")
+
+  // Right outer joins: stream-* not allowed
+  testBinaryOperationInStreamingPlan(
+    "right outer join",
+    _.join(_, joinType = RightOuter),
+    streamStreamSupported = false,
+    streamBatchSupported = false)
+
+  // Cogroup: only batch-batch is allowed
+  testBinaryOperationInStreamingPlan(
+    "cogroup",
+    genCogroup,
+    streamStreamSupported = false,
+    batchStreamSupported = false,
+    streamBatchSupported = false)
+
+  def genCogroup(left: LogicalPlan, right: LogicalPlan): LogicalPlan = {
+    def func(k: Int, left: Iterator[Int], right: Iterator[Int]): Iterator[Int] = {
+      Iterator.empty
+    }
+    implicit val intEncoder = ExpressionEncoder[Int]
+
+    left.cogroup[Int, Int, Int, Int](
+      right,
+      func,
+      AppendColumns[Int, Int]((x: Int) => x, left).newColumns,
+      AppendColumns[Int, Int]((x: Int) => x, right).newColumns,
+      left.output,
+      right.output)
+  }
+
+  // Union: Mixing between stream and batch not supported
+  testBinaryOperationInStreamingPlan(
+    "union",
+    _.union(_),
+    streamBatchSupported = false,
+    batchStreamSupported = false)
+
+  // Except: *-stream not supported
+  testBinaryOperationInStreamingPlan(
+    "except",
+    _.except(_),
+    streamStreamSupported = false,
+    batchStreamSupported = false)
+
+  // Intersect: stream-stream not supported
+  testBinaryOperationInStreamingPlan(
+    "intersect",
+    _.intersect(_),
+    streamStreamSupported = false)
+
+
+  // Unary operations
+  testUnaryOperatorInStreamingPlan("sort", Sort(Nil, true, _))
+  testUnaryOperatorInStreamingPlan("sort partitions", SortPartitions(Nil, _), expectedMsg = "sort")
+  testUnaryOperatorInStreamingPlan(
+    "sample", Sample(0.1, 1, true, 1L, _)(), expectedMsg = "sampling")
+  testUnaryOperatorInStreamingPlan(
+    "window", Window(Nil, Nil, Nil, _), expectedMsg = "non-time-based windows")
+
+
+  /*
+    =======================================================================================
+                                     TESTING FUNCTIONS
+    =======================================================================================
+   */
+
+  /**
+   * Test that an unary operator correctly fails support check when it has a streaming child plan,
+   * but not when it has batch child plan. There can be batch sub-plans inside a streaming plan,
+   * so it is valid for the operator to have a batch child plan.
+   *
+   * This test wraps the logical plan in a fake operator that makes the whole plan look like
+   * a streaming plan even if the child plan is a batch plan. This is to test that the operator
+   * supports having a batch child plan, forming a batch subplan inside a streaming plan.
+   */
+  def testUnaryOperatorInStreamingPlan(
+    operationName: String,
+    logicalPlanGenerator: LogicalPlan => LogicalPlan,
+    outputMode: OutputMode = Append,
+    expectedMsg: String = ""): Unit = {
+
+    val expectedMsgs = if (expectedMsg.isEmpty) Seq(operationName) else Seq(expectedMsg)
+
+    assertNotSupportedInStreamingPlan(
+      s"$operationName with stream relation",
+      wrapInStreaming(logicalPlanGenerator(streamRelation)),
+      outputMode,
+      expectedMsgs)
+
+    assertSupportedInStreamingPlan(
+      s"$operationName with batch relation",
+      wrapInStreaming(logicalPlanGenerator(batchRelation)),
+      outputMode)
+  }
+
+
+  /**
+   * Test that a binary operator correctly fails support check when it has combinations of
+   * streaming and batch child plans. There can be batch sub-plans inside a streaming plan,
+   * so it is valid for the operator to have a batch child plan.
+   */
+  def testBinaryOperationInStreamingPlan(
+      operationName: String,
+      planGenerator: (LogicalPlan, LogicalPlan) => LogicalPlan,
+      outputMode: OutputMode = Append,
+      streamStreamSupported: Boolean = true,
+      streamBatchSupported: Boolean = true,
+      batchStreamSupported: Boolean = true,
+      expectedMsg: String = ""): Unit = {
+
+    val expectedMsgs = if (expectedMsg.isEmpty) Seq(operationName) else Seq(expectedMsg)
+
+    if (streamStreamSupported) {
+      assertSupportedInStreamingPlan(
+        s"$operationName with stream-stream relations",
+        planGenerator(streamRelation, streamRelation),
+        outputMode)
+    } else {
+      assertNotSupportedInStreamingPlan(
+        s"$operationName with stream-stream relations",
+        planGenerator(streamRelation, streamRelation),
+        outputMode,
+        expectedMsgs)
+    }
+
+    if (streamBatchSupported) {
+      assertSupportedInStreamingPlan(
+        s"$operationName with stream-batch relations",
+        planGenerator(streamRelation, batchRelation),
+        outputMode)
+    } else {
+      assertNotSupportedInStreamingPlan(
+        s"$operationName with stream-batch relations",
+        planGenerator(streamRelation, batchRelation),
+        outputMode,
+        expectedMsgs)
+    }
+
+    if (batchStreamSupported) {
+      assertSupportedInStreamingPlan(
+        s"$operationName with batch-stream relations",
+        planGenerator(batchRelation, streamRelation),
+        outputMode)
+    } else {
+      assertNotSupportedInStreamingPlan(
+        s"$operationName with batch-stream relations",
+        planGenerator(batchRelation, streamRelation),
+        outputMode,
+        expectedMsgs)
+    }
+
+    assertSupportedInStreamingPlan(
+      s"$operationName with batch-batch relations",
+      planGenerator(batchRelation, batchRelation),
+      outputMode)
+  }
+
+  /**
+   * Assert that the logical plan is supported as subplan insider a streaming plan.
+   *
+   * To test this correctly, the given logical plan is wrapped in a fake operator that makes the
+   * whole plan look like a streaming plan. Otherwise, a batch plan may throw not supported
+   * exception simply for not being a streaming plan, even though that plan could exists as batch
+   * subplan inside some streaming plan.
+   */
+  def assertSupportedInStreamingPlan(
+      name: String,
+      plan: LogicalPlan,
+      outputMode: OutputMode): Unit = {
+    test(s"streaming plan - $name: supported") {
+      UnsupportedOperationChecker.checkForStreaming(wrapInStreaming(plan), outputMode)
+    }
+  }
+
+  /**
+   * Assert that the logical plan is not supported inside a streaming plan.
+   *
+   * To test this correctly, the given logical plan is wrapped in a fake operator that makes the
+   * whole plan look like a streaming plan. Otherwise, a batch plan may throw not supported
+   * exception simply for not being a streaming plan, even though that plan could exists as batch
+   * subplan inside some streaming plan.
+   */
+  def assertNotSupportedInStreamingPlan(
+      name: String,
+      plan: LogicalPlan,
+      outputMode: OutputMode,
+      expectedMsgs: Seq[String]): Unit = {
+    testError(
+      s"streaming plan - $name: not supported",
+      expectedMsgs :+ "streaming" :+ "DataFrame" :+ "Dataset" :+ "not supported") {
+      UnsupportedOperationChecker.checkForStreaming(wrapInStreaming(plan), outputMode)
+    }
+  }
+
+  /** Assert that the logical plan is supported as a batch plan */
+  def assertSupportedInBatchPlan(name: String, plan: LogicalPlan): Unit = {
+    test(s"batch plan - $name: supported") {
+      UnsupportedOperationChecker.checkForBatch(plan)
+    }
+  }
+
+  /** Assert that the logical plan is not supported as a batch plan */
+  def assertNotSupportedInBatchPlan(
+      name: String,
+      plan: LogicalPlan,
+      expectedMsgs: Seq[String]): Unit = {
+    testError(s"batch plan - $name: not supported", expectedMsgs) {
+      UnsupportedOperationChecker.checkForBatch(plan)
+    }
+  }
+
+  /**
+   * Test whether the body of code will fail. If it does fail, then check if it has expected
+   * messages.
+   */
+  def testError(testName: String, expectedMsgs: Seq[String])(testBody: => Unit): Unit = {
+
+    test(testName) {
+      val e = intercept[AnalysisException] {
+        testBody
+      }
+
+      if (!expectedMsgs.map(_.toLowerCase).forall(e.getMessage.toLowerCase.contains)) {
+        fail(
+          s"""Exception message should contain the following substrings:
+              |
+          |  ${expectedMsgs.mkString("\n  ")}
+              |
+          |Actual exception message:
+              |
+          |  ${e.getMessage}
+          """.stripMargin)
+      }
+    }
+  }
+
+  def wrapInStreaming(plan: LogicalPlan): LogicalPlan = {
+    new StreamingPlanWrapper(plan)
+  }
+
+  case class StreamingPlanWrapper(child: LogicalPlan) extends UnaryNode {
+    override def output: Seq[Attribute] = child.output
+    override def isStreaming: Boolean = true
+  }
+
+  case class TestStreamingRelation(output: Seq[Attribute]) extends LeafNode {
+    def this(attribute: Attribute) = this(Seq(attribute))
+    override def isStreaming: Boolean = true
+  }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
index faef9ed2745938109e1591dec07e7a918ee682aa..cc86f1f6e2f48da680cd81e1ab67936cb5e703bf 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
@@ -18,7 +18,9 @@
 package org.apache.spark.sql.catalyst.plans
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types.IntegerType
 
 /**
  * This suite is used to test [[LogicalPlan]]'s `resolveOperators` and make sure it can correctly
@@ -68,4 +70,23 @@ class LogicalPlanSuite extends SparkFunSuite {
 
     assert(invocationCount === 1)
   }
+
+  test("isStreaming") {
+    val relation = LocalRelation(AttributeReference("a", IntegerType, nullable = true)())
+    val incrementalRelation = new LocalRelation(
+      Seq(AttributeReference("a", IntegerType, nullable = true)())) {
+      override def isStreaming(): Boolean = true
+    }
+
+    case class TestBinaryRelation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
+      override def output: Seq[Attribute] = left.output ++ right.output
+    }
+
+    require(relation.isStreaming === false)
+    require(incrementalRelation.isStreaming === true)
+    assert(TestBinaryRelation(relation, relation).isStreaming === false)
+    assert(TestBinaryRelation(incrementalRelation, relation).isStreaming === true)
+    assert(TestBinaryRelation(relation, incrementalRelation).isStreaming === true)
+    assert(TestBinaryRelation(incrementalRelation, incrementalRelation).isStreaming)
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
index 1343e81569cbde7299b95442350f12e39e3df6f9..39d04ed8c24f804c3ec33575bfe19159ba5a1728 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
@@ -20,8 +20,10 @@ package org.apache.spark.sql
 import scala.collection.mutable
 
 import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.catalyst.analysis.{Append, OutputMode, UnsupportedOperationChecker}
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.util.ContinuousQueryListener
 
 /**
@@ -172,14 +174,23 @@ class ContinuousQueryManager(sqlContext: SQLContext) {
       checkpointLocation: String,
       df: DataFrame,
       sink: Sink,
-      trigger: Trigger = ProcessingTime(0)): ContinuousQuery = {
+      trigger: Trigger = ProcessingTime(0),
+      outputMode: OutputMode = Append): ContinuousQuery = {
     activeQueriesLock.synchronized {
       if (activeQueries.contains(name)) {
         throw new IllegalArgumentException(
           s"Cannot start query with name $name as a query with that name is already active")
       }
+      val analyzedPlan = df.queryExecution.analyzed
+      df.queryExecution.assertAnalyzed()
+
+      if (sqlContext.conf.getConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
+        UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
+      }
+
       var nextSourceId = 0L
-      val logicalPlan = df.logicalPlan.transform {
+
+      val logicalPlan = analyzedPlan.transform {
         case StreamingRelation(dataSource, _, output) =>
           // Materialize source to avoid creating it in every batch
           val metadataPath = s"$checkpointLocation/sources/$nextSourceId"
@@ -195,6 +206,7 @@ class ContinuousQueryManager(sqlContext: SQLContext) {
         checkpointLocation,
         logicalPlan,
         sink,
+        outputMode,
         trigger)
       query.start()
       activeQueries.put(name, query)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index fb3e184a640da67d6d92df4a8e5513ced649aaf3..1a09d70fb94c32edd0b23c802a62858749e22c64 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -461,9 +461,7 @@ class Dataset[T] private[sql](
    * @since 2.0.0
    */
   @Experimental
-  def isStreaming: Boolean = logicalPlan.find { n =>
-      n.isInstanceOf[StreamingRelation] || n.isInstanceOf[StreamingExecutionRelation]
-    }.isDefined
+  def isStreaming: Boolean = logicalPlan.isStreaming
 
   /**
    * Displays the [[Dataset]] in a tabular form. Strings more than 20 characters will be truncated,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index f5e1e77263b5b058d0250a75f5e0c7f58af4907b..ddcae0fe075338170d23393c396d1854d08ca00d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -20,9 +20,11 @@ package org.apache.spark.sql.execution
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
+import org.apache.spark.sql.internal.SQLConf
 
 /**
  * The primary workflow for executing relational queries using Spark.  Designed to allow easy
@@ -43,10 +45,17 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
       throw ae
   }
 
+  def assertSupported(): Unit = {
+    if (sqlContext.conf.getConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
+      UnsupportedOperationChecker.checkForBatch(analyzed)
+    }
+  }
+
   lazy val analyzed: LogicalPlan = sqlContext.sessionState.analyzer.execute(logical)
 
   lazy val withCachedData: LogicalPlan = {
     assertAnalyzed()
+    assertSupported()
     sqlContext.cacheManager.useCachedData(analyzed)
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index aaced49dd16ce0cd40f11103124ffdca0b35f17b..81244ed874498a8ea0c620d44e3b3b20021a9760 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -18,9 +18,11 @@
 package org.apache.spark.sql.execution.streaming
 
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.analysis.{OutputMode, UnsupportedOperationChecker}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryNode}
+import org.apache.spark.sql.internal.SQLConf
 
 /**
  * A variant of [[QueryExecution]] that allows the execution of the given [[LogicalPlan]]
@@ -29,6 +31,7 @@ import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner,
 class IncrementalExecution(
     ctx: SQLContext,
     logicalPlan: LogicalPlan,
+    outputMode: OutputMode,
     checkpointLocation: String,
     currentBatchId: Long) extends QueryExecution(ctx, logicalPlan) {
 
@@ -69,4 +72,7 @@ class IncrementalExecution(
   }
 
   override def preparations: Seq[Rule[SparkPlan]] = state +: super.preparations
+
+  /** No need assert supported, as this check has already been done */
+  override def assertSupported(): Unit = { }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 87dd27a2b1aedf3ebb733ea476c7e573fe674194..2a1fa1ba627c85ac758f08a5df030c465496f41e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.OutputMode
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
@@ -48,6 +49,7 @@ class StreamExecution(
     checkpointRoot: String,
     private[sql] val logicalPlan: LogicalPlan,
     val sink: Sink,
+    val outputMode: OutputMode,
     val trigger: Trigger) extends ContinuousQuery with Logging {
 
   /** An monitor used to wait/notify when batches complete. */
@@ -314,8 +316,13 @@ class StreamExecution(
     }
 
     val optimizerStart = System.nanoTime()
-    lastExecution =
-        new IncrementalExecution(sqlContext, newPlan, checkpointFile("state"), currentBatchId)
+    lastExecution = new IncrementalExecution(
+      sqlContext,
+      newPlan,
+      outputMode,
+      checkpointFile("state"),
+      currentBatchId)
+
     lastExecution.executedPlan
     val optimizerTime = (System.nanoTime() - optimizerStart).toDouble / 1000000
     logDebug(s"Optimized batch in ${optimizerTime}ms")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index d2872e49ce28ac7f0933c80dcaaa135856bc3879..c29291eb584a0d5b5a60f4ed3a177f872b0ffeac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -37,6 +37,7 @@ object StreamingRelation {
  */
 case class StreamingRelation(dataSource: DataSource, sourceName: String, output: Seq[Attribute])
   extends LeafNode {
+  override def isStreaming: Boolean = true
   override def toString: String = sourceName
 }
 
@@ -45,6 +46,7 @@ case class StreamingRelation(dataSource: DataSource, sourceName: String, output:
  * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]].
  */
 case class StreamingExecutionRelation(source: Source, output: Seq[Attribute]) extends LeafNode {
+  override def isStreaming: Boolean = true
   override def toString: String = source.toString
 }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 70e18cebdd7fb96154fd1f6d627852fd77746c43..7f206bdb9b24076f9577cb748cd654b59af11a0a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -442,6 +442,14 @@ object SQLConf {
     .stringConf
     .createOptional
 
+  val UNSUPPORTED_OPERATION_CHECK_ENABLED =
+    SQLConfigBuilder("spark.sql.streaming.unsupportedOperationCheck")
+      .internal()
+      .doc("When true, the logical plan for continuous query will be checked for unsupported" +
+        " operations.")
+      .booleanConf
+      .createWithDefault(true)
+
   // TODO: This is still WIP and shouldn't be turned on without extensive test coverage
   val COLUMNAR_AGGREGATE_MAP_ENABLED = SQLConfigBuilder("spark.sql.codegen.aggregate.map.enabled")
     .internal()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
index 6ccc99fe179d7415cc20edf93905431ac0514d0b..242ea9cb2736193dd6331aafb24c29fc0a0dcca8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
@@ -33,6 +33,7 @@ import org.scalatest.exceptions.TestFailedDueToTimeoutException
 import org.scalatest.time.Span
 import org.scalatest.time.SpanSugar._
 
+import org.apache.spark.sql.catalyst.analysis.{Append, OutputMode}
 import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, RowEncoder}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util._
@@ -75,6 +76,8 @@ trait StreamTest extends QueryTest with Timeouts {
   /** How long to wait for an active stream to catch up when checking a result. */
   val streamingTimeout = 10.seconds
 
+  val outputMode: OutputMode = Append
+
   /** A trait for actions that can be performed while testing a streaming DataFrame. */
   trait StreamAction
 
@@ -228,6 +231,7 @@ trait StreamTest extends QueryTest with Timeouts {
          |$testActions
          |
          |== Stream ==
+         |Output Mode: $outputMode
          |Stream state: $currentOffsets
          |Thread state: $threadState
          |${if (streamDeathCause != null) stackTraceToString(streamDeathCause) else ""}
@@ -235,6 +239,7 @@ trait StreamTest extends QueryTest with Timeouts {
          |== Sink ==
          |${sink.toDebugString}
          |
+         |
          |== Plan ==
          |${if (currentStream != null) currentStream.lastExecution else ""}
          """.stripMargin
@@ -293,7 +298,8 @@ trait StreamTest extends QueryTest with Timeouts {
                   StreamExecution.nextName,
                   metadataRoot,
                   stream,
-                  sink)
+                  sink,
+                  outputMode = outputMode)
                 .asInstanceOf[StreamExecution]
             currentStream.microBatchThread.setUncaughtExceptionHandler(
               new UncaughtExceptionHandler {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 2bd27c7efdbdca38e171e948a10d49d7225eb611..6f3149dbc5033c5a8e5475f2ead75ed87b27dd57 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -17,10 +17,9 @@
 
 package org.apache.spark.sql.streaming
 
-import org.scalatest.concurrent.Eventually._
-
-import org.apache.spark.sql.{DataFrame, Row, SQLContext, StreamTest}
+import org.apache.spark.sql._
 import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.sources.StreamSourceProvider
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
@@ -108,6 +107,35 @@ class StreamSuite extends StreamTest with SharedSQLContext {
     assertDF(df)
     assertDF(df)
   }
+
+  test("unsupported queries") {
+    val streamInput = MemoryStream[Int]
+    val batchInput = Seq(1, 2, 3).toDS()
+
+    def assertError(expectedMsgs: Seq[String])(body: => Unit): Unit = {
+      val e = intercept[AnalysisException] {
+        body
+      }
+      expectedMsgs.foreach { s => assert(e.getMessage.contains(s)) }
+    }
+
+    // Running streaming plan as a batch query
+    assertError("startStream" :: Nil) {
+      streamInput.toDS.map { i => i }.count()
+    }
+
+    // Running non-streaming plan with as a streaming query
+    assertError("without streaming sources" :: "startStream" :: Nil) {
+      val ds = batchInput.map { i => i }
+      testStream(ds)()
+    }
+
+    // Running streaming plan that cannot be incrementalized
+    assertError("not supported" :: "streaming" :: Nil) {
+      val ds = streamInput.toDS.map { i => i }.sort()
+      testStream(ds)()
+    }
+  }
 }
 
 /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 3af7c01e525add086bf006da096e950c0c1be786..fa3b122f6d2daf55da8eb3fc306cac0b90a9f503 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.streaming
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.StreamTest
+import org.apache.spark.sql.catalyst.analysis.Update
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.expressions.scala.typed
 import org.apache.spark.sql.functions._
@@ -32,6 +33,8 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext {
 
   import testImplicits._
 
+  override val outputMode = Update
+
   test("simple count") {
     val inputData = MemoryStream[Int]