diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
index a3fd7f13b3db72c5b3467cf4b4e938498bfa8fc5..40ee048e2653e0ad43e0645573d2119846b4206e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.{SaveMode, AnalysisException}
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, Catalog}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Alias}
 import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.types.DataType
 
@@ -119,6 +119,13 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
         // The relation in l is not an InsertableRelation.
         failAnalysis(s"$l does not allow insertion.")
 
+      case logical.InsertIntoTable(t, _, _, _, _) =>
+        if (!t.isInstanceOf[LeafNode] || t == OneRowRelation || t.isInstanceOf[LocalRelation]) {
+          failAnalysis(s"Inserting into an RDD-based table is not allowed.")
+        } else {
+          // OK
+        }
+
       case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) =>
         // When the SaveMode is Overwrite, we need to check if the table is an input table of
         // the query. If so, we will throw an AnalysisException to let users know it is not allowed.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index f592a9934d0e62d1b1d6a0260e7851bb12b1ee2f..23244fd310d0f30b3d25ec7d00553fe000f3195f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -17,19 +17,23 @@
 
 package org.apache.spark.sql
 
+import java.io.File
+
 import scala.language.postfixOps
 
+import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint}
-
+import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint, SQLTestUtils}
 
-class DataFrameSuite extends QueryTest {
+class DataFrameSuite extends QueryTest with SQLTestUtils {
   import org.apache.spark.sql.TestData._
 
   lazy val ctx = org.apache.spark.sql.test.TestSQLContext
   import ctx.implicits._
 
+  def sqlContext: SQLContext = ctx
+
   test("analysis error should be eagerly reported") {
     val oldSetting = ctx.conf.dataFrameEagerAnalysis
     // Eager analysis.
@@ -761,4 +765,49 @@ class DataFrameSuite extends QueryTest {
     assert(f.getMessage.contains("column3"))
     assert(!f.getMessage.contains("column2"))
   }
+
+  test("SPARK-6941: Better error message for inserting into RDD-based Table") {
+    withTempDir { dir =>
+
+      val tempParquetFile = new File(dir, "tmp_parquet")
+      val tempJsonFile = new File(dir, "tmp_json")
+
+      val df = Seq(Tuple1(1)).toDF()
+      val insertion = Seq(Tuple1(2)).toDF("col")
+
+      // pass case: parquet table (HadoopFsRelation)
+      df.write.mode(SaveMode.Overwrite).parquet(tempParquetFile.getCanonicalPath)
+      val pdf = ctx.read.parquet(tempParquetFile.getCanonicalPath)
+      pdf.registerTempTable("parquet_base")
+      insertion.write.insertInto("parquet_base")
+
+      // pass case: json table (InsertableRelation)
+      df.write.mode(SaveMode.Overwrite).json(tempJsonFile.getCanonicalPath)
+      val jdf = ctx.read.json(tempJsonFile.getCanonicalPath)
+      jdf.registerTempTable("json_base")
+      insertion.write.mode(SaveMode.Overwrite).insertInto("json_base")
+
+      // error cases: insert into an RDD
+      df.registerTempTable("rdd_base")
+      val e1 = intercept[AnalysisException] {
+        insertion.write.insertInto("rdd_base")
+      }
+      assert(e1.getMessage.contains("Inserting into an RDD-based table is not allowed."))
+
+      // error case: insert into a logical plan that is not a LeafNode
+      val indirectDS = pdf.select("_1").filter($"_1" > 5)
+      indirectDS.registerTempTable("indirect_ds")
+      val e2 = intercept[AnalysisException] {
+        insertion.write.insertInto("indirect_ds")
+      }
+      assert(e2.getMessage.contains("Inserting into an RDD-based table is not allowed."))
+
+      // error case: insert into an OneRowRelation
+      new DataFrame(ctx, OneRowRelation).registerTempTable("one_row")
+      val e3 = intercept[AnalysisException] {
+        insertion.write.insertInto("one_row")
+      }
+      assert(e3.getMessage.contains("Inserting into an RDD-based table is not allowed."))
+    }
+  }
 }