diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index aa4320bd582cbc021b1f7c71aac8b63bf8e0e229..fc37b8cde080603adf15b930788fa3d983a6bdd9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -50,7 +50,13 @@ class Analyzer(catalog: Catalog,
   /**
    * Override to provide additional rules for the "Resolution" batch.
    */
-  val extendedRules: Seq[Rule[LogicalPlan]] = Nil
+  val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Nil
+
+  /**
+   * Override to provide additional rules for the "Check Analysis" batch.
+   * These rules will be evaluated after our built-in check rules.
+   */
+  val extendedCheckRules: Seq[Rule[LogicalPlan]] = Nil
 
   lazy val batches: Seq[Batch] = Seq(
     Batch("Resolution", fixedPoint,
@@ -64,9 +70,10 @@ class Analyzer(catalog: Catalog,
       UnresolvedHavingClauseAttributes ::
       TrimGroupingAliases ::
       typeCoercionRules ++
-      extendedRules : _*),
+      extendedResolutionRules : _*),
     Batch("Check Analysis", Once,
-      CheckResolution),
+      CheckResolution +:
+      extendedCheckRules: _*),
     Batch("Remove SubQueries", fixedPoint,
       EliminateSubQueries)
   )
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index 500e3c90fdbc1fb429b1ab1be735b98c35aed502..3c1cf8d5e38510669f316adb233a7f1c0590e631 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -67,7 +67,11 @@ private[sql] class DataFrameImpl protected[sql](
   @transient protected[sql] override val logicalPlan: LogicalPlan = queryExecution.logical match {
     // For various commands (like DDL) and queries with side effects, we force query optimization to
     // happen right away to let these side effects take place eagerly.
-    case _: Command | _: InsertIntoTable | _: CreateTableAsSelect[_] |_: WriteToFile =>
+    case _: Command |
+         _: InsertIntoTable |
+         _: CreateTableAsSelect[_] |
+         _: CreateTableUsingAsSelect |
+         _: WriteToFile =>
       LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
     case _ =>
       queryExecution.logical
@@ -386,7 +390,7 @@ private[sql] class DataFrameImpl protected[sql](
       mode: SaveMode,
       options: Map[String, String]): Unit = {
     val cmd =
-      CreateTableUsingAsLogicalPlan(
+      CreateTableUsingAsSelect(
         tableName,
         source,
         temporary = false,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 14422505694166f589a593231f2776e2b1393110..d08c2d1cfe030be0a3902079d6e60e7febde38e7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -92,7 +92,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
   @transient
   protected[sql] lazy val analyzer: Analyzer =
     new Analyzer(catalog, functionRegistry, caseSensitive = true) {
-      override val extendedRules =
+      override val extendedResolutionRules =
+        sources.PreWriteCheck(catalog) ::
         sources.PreInsertCastAndRename ::
         Nil
     }
@@ -101,7 +102,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
   protected[sql] lazy val optimizer: Optimizer = DefaultOptimizer
 
   @transient
-  protected[sql] val ddlParser = new DDLParser
+  protected[sql] val ddlParser = new DDLParser(sqlParser.apply(_))
 
   @transient
   protected[sql] val sqlParser = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index e915e0e6a0ec1b039180888edec0f32ed0b4e8ce..5281c7502556a7a7560c5ecdf5c091f82b297f29 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -319,18 +319,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         sys.error("allowExisting should be set to false when creating a temporary table.")
 
       case CreateTableUsingAsSelect(tableName, provider, true, mode, opts, query) =>
-        val logicalPlan = sqlContext.parseSql(query)
-        val cmd =
-          CreateTempTableUsingAsSelect(tableName, provider, mode, opts, logicalPlan)
-        ExecutedCommand(cmd) :: Nil
-      case c: CreateTableUsingAsSelect if !c.temporary =>
-        sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
-
-      case CreateTableUsingAsLogicalPlan(tableName, provider, true, mode, opts, query) =>
         val cmd =
           CreateTempTableUsingAsSelect(tableName, provider, mode, opts, query)
         ExecutedCommand(cmd) :: Nil
-      case c: CreateTableUsingAsLogicalPlan if !c.temporary =>
+      case c: CreateTableUsingAsSelect if !c.temporary =>
         sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
 
       case LogicalDescribeCommand(table, isExtended) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index a853385fdac6845b52583a23607d7351c86d9e5e..67f3507c61ab60ec0246faab83e367d95127b5e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -55,10 +55,7 @@ private[sql] object DataSourceStrategy extends Strategy {
       execution.PhysicalRDD(l.output, t.buildScan()) :: Nil
 
     case i @ logical.InsertIntoTable(
-      l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite) =>
-      if (partition.nonEmpty) {
-        sys.error(s"Insert into a partition is not allowed because $l is not partitioned.")
-      }
+      l @ LogicalRelation(t: InsertableRelation), part, query, overwrite) if part.isEmpty =>
       execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil
 
     case _ => Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 1b5e8c280e2fea3216ce1fed5b14a9da9e3253e2..dd8b3d211be648bce687ae702903957640428d3d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions.{Row, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
 import org.apache.spark.sql.execution.RunnableCommand
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -32,7 +32,8 @@ import org.apache.spark.util.Utils
 /**
  * A parser for foreign DDL commands.
  */
-private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
+private[sql] class DDLParser(
+    parseQuery: String => LogicalPlan) extends AbstractSparkSQLParser with Logging {
 
   def apply(input: String, exceptionOnError: Boolean): Option[LogicalPlan] = {
     try {
@@ -105,6 +106,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
    * AS SELECT ...
    */
   protected lazy val createTable: Parser[LogicalPlan] =
+    // TODO: Support database.table.
     (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~
       tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ {
       case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query =>
@@ -128,12 +130,13 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
             SaveMode.ErrorIfExists
           }
 
+          val queryPlan = parseQuery(query.get)
           CreateTableUsingAsSelect(tableName,
             provider,
             temp.isDefined,
             mode,
             options,
-            query.get)
+            queryPlan)
         } else {
           val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
           CreateTableUsing(
@@ -345,21 +348,23 @@ private[sql] case class CreateTableUsing(
     allowExisting: Boolean,
     managedIfNoPath: Boolean) extends Command
 
+/**
+ * A node used to support CTAS statements and saveAsTable for the data source API.
+ * This node is a [[UnaryNode]] instead of a [[Command]] because we want the analyzer
+ * can analyze the logical plan that will be used to populate the table.
+ * So, [[PreWriteCheck]] can detect cases that are not allowed.
+ */
 private[sql] case class CreateTableUsingAsSelect(
     tableName: String,
     provider: String,
     temporary: Boolean,
     mode: SaveMode,
     options: Map[String, String],
-    query: String) extends Command
-
-private[sql] case class CreateTableUsingAsLogicalPlan(
-    tableName: String,
-    provider: String,
-    temporary: Boolean,
-    mode: SaveMode,
-    options: Map[String, String],
-    query: LogicalPlan) extends Command
+    child: LogicalPlan) extends UnaryNode {
+  override def output = Seq.empty[Attribute]
+  // TODO: Override resolved after we support databaseName.
+  // override lazy val resolved = databaseName != None && childrenResolved
+}
 
 private[sql] case class CreateTempTableUsing(
     tableName: String,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
index 4ed22d363da5b0b33625e153f42d0b5453f8e7a8..36a9c0bdc41e633ca3743ca5add2ee04074889a2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
@@ -17,7 +17,10 @@
 
 package org.apache.spark.sql.sources
 
+import org.apache.spark.sql.{SaveMode, AnalysisException}
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, Catalog}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Alias}
+import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.types.DataType
@@ -26,11 +29,9 @@ import org.apache.spark.sql.types.DataType
  * A rule to do pre-insert data type casting and field renaming. Before we insert into
  * an [[InsertableRelation]], we will use this rule to make sure that
  * the columns to be inserted have the correct data type and fields have the correct names.
- * @param resolver The resolver used by the Analyzer.
  */
 private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = {
-    plan.transform {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
       // Wait until children are resolved.
       case p: LogicalPlan if !p.childrenResolved => p
 
@@ -46,7 +47,6 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {
         }
         castAndRenameChildOutput(i, l.output, child)
       }
-    }
   }
 
   /** If necessary, cast data types and rename fields to the expected types and names. */
@@ -74,3 +74,67 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {
     }
   }
 }
+
+/**
+ * A rule to do various checks before inserting into or writing to a data source table.
+ */
+private[sql] case class PreWriteCheck(catalog: Catalog) extends Rule[LogicalPlan] {
+  def failAnalysis(msg: String) = { throw new AnalysisException(msg) }
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.foreach {
+      case i @ logical.InsertIntoTable(
+        l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite) =>
+        // Right now, we do not support insert into a data source table with partition specs.
+        if (partition.nonEmpty) {
+          failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.")
+        } else {
+          // Get all input data source relations of the query.
+          val srcRelations = query.collect {
+            case LogicalRelation(src: BaseRelation) => src
+          }
+          if (srcRelations.exists(src => src == t)) {
+            failAnalysis(
+              "Cannot insert overwrite into table that is also being read from.")
+          } else {
+            // OK
+          }
+        }
+
+      case i @ logical.InsertIntoTable(
+        l: LogicalRelation, partition, query, overwrite) if !l.isInstanceOf[InsertableRelation] =>
+        // The relation in l is not an InsertableRelation.
+        failAnalysis(s"$l does not allow insertion.")
+
+      case CreateTableUsingAsSelect(tableName, _, _, SaveMode.Overwrite, _, query) =>
+        // When the SaveMode is Overwrite, we need to check if the table is an input table of
+        // the query. If so, we will throw an AnalysisException to let users know it is not allowed.
+        if (catalog.tableExists(Seq(tableName))) {
+          // Need to remove SubQuery operator.
+          EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) match {
+            // Only do the check if the table is a data source table
+            // (the relation is a BaseRelation).
+            case l @ LogicalRelation(dest: BaseRelation) =>
+              // Get all input data source relations of the query.
+              val srcRelations = query.collect {
+                case LogicalRelation(src: BaseRelation) => src
+              }
+              if (srcRelations.exists(src => src == dest)) {
+                failAnalysis(
+                  s"Cannot overwrite table $tableName that is also being read from.")
+              } else {
+                // OK
+              }
+
+            case _ => // OK
+          }
+        } else {
+          // OK
+        }
+
+      case _ => // OK
+    }
+
+    plan
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index d0665450cd766db6a9f1c8bce74236c1a4d87a9d..9318c15520a1007d89a6a3e4832685222a5fda8f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -38,21 +38,22 @@ class ParquetQuerySuiteBase extends QueryTest with ParquetTest {
 
   test("appending") {
     val data = (0 until 10).map(i => (i, i.toString))
+    createDataFrame(data).toDF("c1", "c2").registerTempTable("tmp")
     withParquetTable(data, "t") {
-      sql("INSERT INTO TABLE t SELECT * FROM t")
+      sql("INSERT INTO TABLE t SELECT * FROM tmp")
       checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
     }
+    catalog.unregisterTable(Seq("tmp"))
   }
 
-  // This test case will trigger the NPE mentioned in
-  // https://issues.apache.org/jira/browse/PARQUET-151.
-  // Update: This also triggers SPARK-5746, should re enable it when we get both fixed.
-  ignore("overwriting") {
+  test("overwriting") {
     val data = (0 until 10).map(i => (i, i.toString))
+    createDataFrame(data).toDF("c1", "c2").registerTempTable("tmp")
     withParquetTable(data, "t") {
-      sql("INSERT OVERWRITE TABLE t SELECT * FROM t")
+      sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
       checkAnswer(table("t"), data.map(Row.fromTuple))
     }
+    catalog.unregisterTable(Seq("tmp"))
   }
 
   test("self-join") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index 29caed9337ff6ca3724585facac5c18ad12934c7..60355414a40fafa90b93bb5fa50d88ed75537a5f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.sources
 
 import java.io.File
 
+import org.apache.spark.sql.AnalysisException
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql.catalyst.util
@@ -157,4 +158,31 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
       """.stripMargin)
     }
   }
+
+  test("it is not allowed to write to a table while querying it.") {
+    sql(
+      s"""
+        |CREATE TEMPORARY TABLE jsonTable
+        |USING org.apache.spark.sql.json.DefaultSource
+        |OPTIONS (
+        |  path '${path.toString}'
+        |) AS
+        |SELECT a, b FROM jt
+      """.stripMargin)
+
+    val message = intercept[AnalysisException] {
+      sql(
+        s"""
+        |CREATE TEMPORARY TABLE jsonTable
+        |USING org.apache.spark.sql.json.DefaultSource
+        |OPTIONS (
+        |  path '${path.toString}'
+        |) AS
+        |SELECT a, b FROM jsonTable
+      """.stripMargin)
+    }.getMessage
+    assert(
+      message.contains("Cannot overwrite table "),
+      "Writing to a table while querying it should not be allowed.")
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala
index 53f5f7426e9e6e12e6203834c6b39883d98cc699..0ec6881d7afe603317f123bd610c266446f2be89 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala
@@ -29,7 +29,8 @@ abstract class DataSourceTest extends QueryTest with BeforeAndAfter {
     @transient
     override protected[sql] lazy val analyzer: Analyzer =
       new Analyzer(catalog, functionRegistry, caseSensitive = false) {
-        override val extendedRules =
+        override val extendedResolutionRules =
+          PreWriteCheck(catalog) ::
           PreInsertCastAndRename ::
           Nil
       }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
similarity index 79%
rename from sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala
rename to sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 36e504e7591524e528c1e8be65107e1f84d6583a..5682e5a2bcea9917350d12199baeab3435cd5b26 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -21,11 +21,11 @@ import java.io.File
 
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.catalyst.util
 import org.apache.spark.util.Utils
 
-class InsertIntoSuite extends DataSourceTest with BeforeAndAfterAll {
+class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
 
   import caseInsensisitiveContext._
 
@@ -129,6 +129,18 @@ class InsertIntoSuite extends DataSourceTest with BeforeAndAfterAll {
     }
   }
 
+  test("it is not allowed to write to a table while querying it.") {
+    val message = intercept[AnalysisException] {
+      sql(
+        s"""
+        |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jsonTable
+      """.stripMargin)
+    }.getMessage
+    assert(
+      message.contains("Cannot insert overwrite into table that is also being read from."),
+      "INSERT OVERWRITE to a table while querying it should not be allowed.")
+  }
+
   test("Caching")  {
     // Cached Query Execution
     cacheTable("jsonTable")
@@ -173,4 +185,34 @@ class InsertIntoSuite extends DataSourceTest with BeforeAndAfterAll {
     uncacheTable("jsonTable")
     assertCached(sql("SELECT * FROM jsonTable"), 0)
   }
+
+  test("it's not allowed to insert into a relation that is not an InsertableRelation") {
+    sql(
+      """
+        |CREATE TEMPORARY TABLE oneToTen
+        |USING org.apache.spark.sql.sources.SimpleScanSource
+        |OPTIONS (
+        |  From '1',
+        |  To '10'
+        |)
+      """.stripMargin)
+
+    checkAnswer(
+      sql("SELECT * FROM oneToTen"),
+      (1 to 10).map(Row(_)).toSeq
+    )
+
+    val message = intercept[AnalysisException] {
+      sql(
+        s"""
+        |INSERT OVERWRITE TABLE oneToTen SELECT a FROM jt
+        """.stripMargin)
+    }.getMessage
+    assert(
+      message.contains("does not allow insertion."),
+      "It is not allowed to insert into a table that is not an InsertableRelation."
+    )
+
+    dropTempTable("oneToTen")
+  }
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 87b380f9509793bdbfa78f41ef5fb3c9ed055bfd..6c55bc6be17f9f5ca2e67f6e7212bd4ee7109f2f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateSubQueries, Ov
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, QueryExecutionException, SetCommand}
 import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
-import org.apache.spark.sql.sources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DDLParser, DataSourceStrategy}
 import org.apache.spark.sql.types._
 
 /**
@@ -64,14 +64,17 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
   override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
     new this.QueryExecution(plan)
 
+  @transient
+  protected[sql] val ddlParserWithHiveQL = new DDLParser(HiveQl.parseSql(_))
+
   override def sql(sqlText: String): DataFrame = {
     val substituted = new VariableSubstitution().substitute(hiveconf, sqlText)
     // TODO: Create a framework for registering parsers instead of just hardcoding if statements.
     if (conf.dialect == "sql") {
       super.sql(substituted)
     } else if (conf.dialect == "hiveql") {
-      DataFrame(this,
-        ddlParser(sqlText, exceptionOnError = false).getOrElse(HiveQl.parseSql(substituted)))
+      val ddlPlan = ddlParserWithHiveQL(sqlText, exceptionOnError = false)
+      DataFrame(this, ddlPlan.getOrElse(HiveQl.parseSql(substituted)))
     }  else {
       sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'")
     }
@@ -241,12 +244,13 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
   @transient
   override protected[sql] lazy val analyzer =
     new Analyzer(catalog, functionRegistry, caseSensitive = false) {
-      override val extendedRules =
+      override val extendedResolutionRules =
         catalog.ParquetConversions ::
         catalog.CreateTables ::
         catalog.PreInsertionCasts ::
         ExtractPythonUdfs ::
         ResolveUdtfsAlias ::
+        sources.PreWriteCheck(catalog) ::
         sources.PreInsertCastAndRename ::
         Nil
     }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 580c5706dde67559672634bee04d23e328dbc8b5..72211fe2e46c6d4318f759fb608476cbd97e1e2a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -663,7 +663,7 @@ private[hive] case class MetastoreRelation
 }
 
 object HiveMetastoreTypes {
-  protected val ddlParser = new DDLParser
+  protected val ddlParser = new DDLParser(HiveQl.parseSql(_))
 
   def toDataType(metastoreType: String): DataType = synchronized {
     ddlParser.parseType(metastoreType)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 965d159656d804e462e2aa95a8685b046cee67a9..d2c39ab6217138b8c8b2b2d48efdffaa670b0e84 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeComman
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.hive.execution._
 import org.apache.spark.sql.parquet.ParquetRelation
-import org.apache.spark.sql.sources.{CreateTableUsingAsLogicalPlan, CreateTableUsingAsSelect, CreateTableUsing}
+import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, CreateTableUsing}
 import org.apache.spark.sql.types.StringType
 
 
@@ -227,12 +227,6 @@ private[hive] trait HiveStrategies {
             tableName, userSpecifiedSchema, provider, opts, allowExisting, managedIfNoPath)) :: Nil
 
       case CreateTableUsingAsSelect(tableName, provider, false, mode, opts, query) =>
-        val logicalPlan = hiveContext.parseSql(query)
-        val cmd =
-          CreateMetastoreDataSourceAsSelect(tableName, provider, mode, opts, logicalPlan)
-        ExecutedCommand(cmd) :: Nil
-
-      case CreateTableUsingAsLogicalPlan(tableName, provider, false, mode, opts, query) =>
         val cmd =
           CreateMetastoreDataSourceAsSelect(tableName, provider, mode, opts, query)
         ExecutedCommand(cmd) :: Nil