diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index f5398605bc7eb3e5a6ed2a9e3d114af09ca5c72e..bbf48efb2444011c56ec9621dd8d7b4c121316dd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -407,64 +407,58 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
    * For example, because of a CREATE TABLE X AS statement.
    */
   object CreateTables extends Rule[LogicalPlan] {
-    import org.apache.hadoop.hive.ql.Context
-    import org.apache.hadoop.hive.ql.parse.{ASTNode, QB, SemanticAnalyzer}
-
     def apply(plan: LogicalPlan): LogicalPlan = plan transform {
       // Wait until children are resolved.
       case p: LogicalPlan if !p.childrenResolved => p
+      case p: LogicalPlan if p.resolved => p
+      case p @ CreateTableAsSelect(table, child, allowExisting) =>
+        val schema = if (table.schema.size > 0) {
+          table.schema
+        } else {
+          child.output.map {
+            attr => new HiveColumn(
+              attr.name,
+              HiveMetastoreTypes.toMetastoreType(attr.dataType), null)
+          }
+        }
+
+        val desc = table.copy(schema = schema)
 
-      case CreateTableAsSelect(desc, child, allowExisting) =>
-        if (hive.convertCTAS && !desc.serde.isDefined) {
+        if (hive.convertCTAS && table.serde.isEmpty) {
           // Do the conversion when spark.sql.hive.convertCTAS is true and the query
           // does not specify any storage format (file format and storage handler).
-          if (desc.specifiedDatabase.isDefined) {
+          if (table.specifiedDatabase.isDefined) {
             throw new AnalysisException(
               "Cannot specify database name in a CTAS statement " +
-              "when spark.sql.hive.convertCTAS is set to true.")
+                "when spark.sql.hive.convertCTAS is set to true.")
           }
 
           val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
           CreateTableUsingAsSelect(
             desc.name,
-            conf.defaultDataSourceName,
+            hive.conf.defaultDataSourceName,
             temporary = false,
             mode,
             options = Map.empty[String, String],
             child
           )
         } else {
-          execution.CreateTableAsSelect(
-            desc.copy(
-              specifiedDatabase = Option(desc.specifiedDatabase.getOrElse(client.currentDatabase))),
-            child,
-            allowExisting)
-        }
-
-      case p: LogicalPlan if p.resolved => p
-
-      case p @ CreateTableAsSelect(desc, child, allowExisting) =>
-        val (dbName, tblName) = processDatabaseAndTableName(desc.database, desc.name)
-
-        if (hive.convertCTAS) {
-          if (desc.specifiedDatabase.isDefined) {
-            throw new AnalysisException(
-              "Cannot specify database name in a CTAS statement " +
-              "when spark.sql.hive.convertCTAS is set to true.")
+          val desc = if (table.serde.isEmpty) {
+            // add default serde
+            table.copy(
+              serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
+          } else {
+            table
           }
 
-          val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
-          CreateTableUsingAsSelect(
-            tblName,
-            conf.defaultDataSourceName,
-            temporary = false,
-            mode,
-            options = Map.empty[String, String],
-            child
-          )
-        } else {
+          val (dbName, tblName) =
+            processDatabaseAndTableName(
+              desc.specifiedDatabase.getOrElse(client.currentDatabase), desc.name)
+
           execution.CreateTableAsSelect(
-            desc,
+            desc.copy(
+              specifiedDatabase = Some(dbName),
+              name = tblName),
             child,
             allowExisting)
         }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 04d40bbb2bced1e54e71512394a52579d300862b..2cbb5ca4d2e0cb5e551c2e8ef45bfce496df7525 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -22,14 +22,15 @@ import java.sql.Date
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.ql.Context
+import org.apache.hadoop.hive.serde.serdeConstants
+import org.apache.hadoop.hive.ql.{ErrorMsg, Context}
 import org.apache.hadoop.hive.ql.exec.{FunctionRegistry, FunctionInfo}
 import org.apache.hadoop.hive.ql.lib.Node
-import org.apache.hadoop.hive.ql.metadata.Table
 import org.apache.hadoop.hive.ql.parse._
 import org.apache.hadoop.hive.ql.plan.PlanUtils
-import org.apache.spark.sql.AnalysisException
+import org.apache.hadoop.hive.ql.session.SessionState
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
@@ -62,7 +63,13 @@ case class CreateTableAsSelect(
     allowExisting: Boolean) extends UnaryNode with Command {
 
   override def output: Seq[Attribute] = Seq.empty[Attribute]
-  override lazy val resolved: Boolean = tableDesc.specifiedDatabase.isDefined && childrenResolved
+  override lazy val resolved: Boolean =
+    tableDesc.specifiedDatabase.isDefined &&
+    tableDesc.schema.size > 0 &&
+    tableDesc.serde.isDefined &&
+    tableDesc.inputFormat.isDefined &&
+    tableDesc.outputFormat.isDefined &&
+    childrenResolved
 }
 
 /** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
@@ -240,12 +247,23 @@ private[hive] object HiveQl {
      * Otherwise, there will be Null pointer exception,
      * when retrieving properties form HiveConf.
      */
-    val hContext = new Context(new HiveConf())
+    val hContext = new Context(hiveConf)
     val node = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql, hContext))
     hContext.clear()
     node
   }
 
+  /**
+   * Returns the HiveConf
+   */
+  private[this] def hiveConf(): HiveConf = {
+    val ss = SessionState.get() // SessionState is lazy initializaion, it can be null here
+    if (ss == null) {
+      new HiveConf()
+    } else {
+      ss.getConf
+    }
+  }
 
   /** Returns a LogicalPlan for a given HiveQL string. */
   def parseSql(sql: String): LogicalPlan = hqlParser.parse(sql)
@@ -476,8 +494,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
       DropTable(tableName, ifExists.nonEmpty)
     // Support "ANALYZE TABLE tableNmae COMPUTE STATISTICS noscan"
     case Token("TOK_ANALYZE",
-            Token("TOK_TAB", Token("TOK_TABNAME", tableNameParts) :: partitionSpec) ::
-            isNoscan) =>
+           Token("TOK_TAB", Token("TOK_TABNAME", tableNameParts) :: partitionSpec) ::
+           isNoscan) =>
       // Reference:
       // https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables
       if (partitionSpec.nonEmpty) {
@@ -547,6 +565,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
       val (
           Some(tableNameParts) ::
           _ /* likeTable */ ::
+          externalTable ::
           Some(query) ::
           allowExisting +:
           ignores) =
@@ -554,6 +573,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
           Seq(
             "TOK_TABNAME",
             "TOK_LIKETABLE",
+            "EXTERNAL",
             "TOK_QUERY",
             "TOK_IFNOTEXISTS",
             "TOK_TABLECOMMENT",
@@ -576,43 +596,153 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
           children)
       val (db, tableName) = extractDbNameTableName(tableNameParts)
 
-      var tableDesc =
-        HiveTable(
-          specifiedDatabase = db,
-          name = tableName,
-          schema = Seq.empty,
-          partitionColumns = Seq.empty,
-          properties = Map.empty,
-          serdeProperties = Map.empty,
-          tableType = ManagedTable,
-          location = None,
-          inputFormat = None,
-          outputFormat = None,
-          serde = None)
-
-      // TODO: Handle all the cases here...
-      children.foreach {
-        case Token("TOK_TBLRCFILE", Nil) =>
-          import org.apache.hadoop.hive.ql.io.{RCFileInputFormat, RCFileOutputFormat}
+      // TODO add bucket support
+      var tableDesc: HiveTable = HiveTable(
+        specifiedDatabase = db,
+        name = tableName,
+        schema = Seq.empty[HiveColumn],
+        partitionColumns = Seq.empty[HiveColumn],
+        properties = Map[String, String](),
+        serdeProperties = Map[String, String](),
+        tableType = if (externalTable.isDefined) ExternalTable else ManagedTable,
+        location = None,
+        inputFormat = None,
+        outputFormat = None,
+        serde = None,
+        viewText = None)
+
+      // default storage type abbriviation (e.g. RCFile, ORC, PARQUET etc.)
+      val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT)
+      // handle the default format for the storage type abbriviation
+      tableDesc = if ("SequenceFile".equalsIgnoreCase(defaultStorageType)) {
+          tableDesc.copy(
+            inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
+            outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat"))
+        } else if ("RCFile".equalsIgnoreCase(defaultStorageType)) {
+          tableDesc.copy(
+            inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
+            outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
+            serde = Option(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE)))
+        } else if ("ORC".equalsIgnoreCase(defaultStorageType)) {
+          tableDesc.copy(
+            inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
+            outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
+            serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
+        } else if ("PARQUET".equalsIgnoreCase(defaultStorageType)) {
+          tableDesc.copy(
+            inputFormat =
+              Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
+            outputFormat =
+              Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
+            serde =
+              Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
+        } else {
+          tableDesc.copy(
+            inputFormat =
+              Option("org.apache.hadoop.mapred.TextInputFormat"),
+            outputFormat =
+              Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
+        }
+
+      children.collect {
+        case list @ Token("TOK_TABCOLLIST", _) =>
+          val cols = BaseSemanticAnalyzer.getColumns(list, true)
+          if (cols != null) {
+            tableDesc = tableDesc.copy(
+              schema = cols.map { field =>
+                HiveColumn(field.getName, field.getType, field.getComment)
+              })
+          }
+        case Token("TOK_TABLECOMMENT", child :: Nil) =>
+          val comment = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
+          // TODO support the sql text
+          tableDesc = tableDesc.copy(viewText = Option(comment))
+        case Token("TOK_TABLEPARTCOLS", list @ Token("TOK_TABCOLLIST", _) :: Nil) =>
+          val cols = BaseSemanticAnalyzer.getColumns(list(0), false)
+          if (cols != null) {
+            tableDesc = tableDesc.copy(
+              partitionColumns = cols.map { field =>
+                HiveColumn(field.getName, field.getType, field.getComment)
+              })
+          }
+        case Token("TOK_TABLEROWFORMAT", Token("TOK_SERDEPROPS", child :: Nil) :: Nil)=>
+          val serdeParams = new java.util.HashMap[String, String]()
+          child match {
+            case Token("TOK_TABLEROWFORMATFIELD", rowChild1 :: rowChild2) =>
+              val fieldDelim = BaseSemanticAnalyzer.unescapeSQLString (rowChild1.getText())
+              serdeParams.put(serdeConstants.FIELD_DELIM, fieldDelim)
+              serdeParams.put(serdeConstants.SERIALIZATION_FORMAT, fieldDelim)
+              if (rowChild2.length > 1) {
+                val fieldEscape = BaseSemanticAnalyzer.unescapeSQLString (rowChild2(0).getText)
+                serdeParams.put(serdeConstants.ESCAPE_CHAR, fieldEscape)
+              }
+            case Token("TOK_TABLEROWFORMATCOLLITEMS", rowChild :: Nil) =>
+              val collItemDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
+              serdeParams.put(serdeConstants.COLLECTION_DELIM, collItemDelim)
+            case Token("TOK_TABLEROWFORMATMAPKEYS", rowChild :: Nil) =>
+              val mapKeyDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
+              serdeParams.put(serdeConstants.MAPKEY_DELIM, mapKeyDelim)
+            case Token("TOK_TABLEROWFORMATLINES", rowChild :: Nil) =>
+              val lineDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
+              if (!(lineDelim == "\n") && !(lineDelim == "10")) {
+                throw new AnalysisException(
+                  SemanticAnalyzer.generateErrorMessage(
+                    rowChild,
+                    ErrorMsg.LINES_TERMINATED_BY_NON_NEWLINE.getMsg))
+              }
+              serdeParams.put(serdeConstants.LINE_DELIM, lineDelim)
+            case Token("TOK_TABLEROWFORMATNULL", rowChild :: Nil) =>
+              val nullFormat = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
+              // TODO support the nullFormat
+            case _ => assert(false)
+          }
+          tableDesc = tableDesc.copy(
+            serdeProperties = tableDesc.serdeProperties ++ serdeParams)
+        case Token("TOK_TABLELOCATION", child :: Nil) =>
+          var location = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
+          location = EximUtil.relativeToAbsolutePath(hiveConf, location)
+          tableDesc = tableDesc.copy(location = Option(location))
+        case Token("TOK_TABLESERIALIZER", child :: Nil) =>
           tableDesc = tableDesc.copy(
-            outputFormat = Option(classOf[RCFileOutputFormat].getName),
-            inputFormat = Option(classOf[RCFileInputFormat[_, _]].getName))
+            serde = Option(BaseSemanticAnalyzer.unescapeSQLString(child.getChild(0).getText)))
+          if (child.getChildCount == 2) {
+            val serdeParams = new java.util.HashMap[String, String]()
+            BaseSemanticAnalyzer.readProps(
+              (child.getChild(1).getChild(0)).asInstanceOf[ASTNode], serdeParams)
+            tableDesc = tableDesc.copy(serdeProperties = tableDesc.serdeProperties ++ serdeParams)
+          }
+        case Token("TOK_FILEFORMAT_GENERIC", child :: Nil) =>
+          throw new SemanticException(
+            "Unrecognized file format in STORED AS clause:${child.getText}")
 
+        case Token("TOK_TBLRCFILE", Nil) =>
+          tableDesc = tableDesc.copy(
+            inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
+            outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
           if (tableDesc.serde.isEmpty) {
             tableDesc = tableDesc.copy(
               serde = Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
           }
+
         case Token("TOK_TBLORCFILE", Nil) =>
           tableDesc = tableDesc.copy(
             inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
-            outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
-            serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
+            outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
+          if (tableDesc.serde.isEmpty) {
+            tableDesc = tableDesc.copy(
+              serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
+          }
 
         case Token("TOK_TBLPARQUETFILE", Nil) =>
           tableDesc = tableDesc.copy(
-            inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
-            outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
-            serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
+            inputFormat =
+              Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
+            outputFormat =
+              Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
+          if (tableDesc.serde.isEmpty) {
+            tableDesc = tableDesc.copy(
+              serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
+          }
 
         case Token("TOK_TABLESERIALIZER",
                Token("TOK_SERDENAME", Token(serdeName, Nil) :: otherProps) :: Nil) =>
@@ -627,13 +757,20 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
 
         case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
           tableDesc = tableDesc.copy(properties = tableDesc.properties ++ getProperties(list))
-
-        case _ =>
+        case list @ Token("TOK_TABLEFILEFORMAT", _) =>
+          tableDesc = tableDesc.copy(
+            inputFormat =
+              Option(BaseSemanticAnalyzer.unescapeSQLString(list.getChild(0).getText)),
+            outputFormat =
+              Option(BaseSemanticAnalyzer.unescapeSQLString(list.getChild(1).getText)))
+        case Token("TOK_STORAGEHANDLER", _) =>
+          throw new AnalysisException(ErrorMsg.CREATE_NON_NATIVE_AS.getMsg())
+        case _ => // Unsupport features
       }
 
       CreateTableAsSelect(tableDesc, nodeToPlan(query), allowExisting != None)
 
-    // If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command.
+    // If its not a "CTAS" like above then take it as a native command
     case Token("TOK_CREATETABLE", _) => NativePlaceholder
 
     // Support "TRUNCATE TABLE table_name [PARTITION partition_spec]"
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index 6bca9d0179fe3fb7df9134f8105ffd1f73e2cf6f..99aa0f1ded3f8a5d7ab5bc1d4b6e3186e7c3994a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -225,6 +225,12 @@ private[hive] class ClientWrapper(
       table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)))
     table.properties.foreach { case (k, v) => qlTable.setProperty(k, v) }
     table.serdeProperties.foreach { case (k, v) => qlTable.setSerdeParam(k, v) }
+
+    // set owner
+    qlTable.setOwner(conf.getUser)
+    // set create time
+    qlTable.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])
+
     version match {
       case hive.v12 =>
         table.location.map(new URI(_)).foreach(u => qlTable.call[URI, Unit]("setDataLocation", u))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 91e6ac40322046c1ed1979f9db2813fe032c7bb0..7d3ec12c4eb0527425313abd6d4539bb8ab29a04 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -17,10 +17,8 @@
 
 package org.apache.spark.sql.hive.execution
 
-import org.apache.hadoop.hive.ql.plan.CreateTableDesc
-
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.{AnalysisException, SQLContext}
 import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
 import org.apache.spark.sql.execution.RunnableCommand
@@ -29,13 +27,10 @@ import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation, HiveMetastoreT
 
 /**
  * Create table and insert the query result into it.
- * @param database the database name of the new relation
- * @param tableName the table name of the new relation
+ * @param tableDesc the Table Describe, which may contains serde, storage handler etc.
  * @param query the query whose result will be insert into the new relation
  * @param allowExisting allow continue working if it's already exists, otherwise
  *                      raise exception
- * @param desc the CreateTableDesc, which may contains serde, storage handler etc.
-
  */
 private[hive]
 case class CreateTableAsSelect(
@@ -80,8 +75,7 @@ case class CreateTableAsSelect(
       if (allowExisting) {
         // table already exists, will do nothing, to keep consistent with Hive
       } else {
-        throw
-          new org.apache.hadoop.hive.metastore.api.AlreadyExistsException(s"$database.$tableName")
+        throw new AnalysisException(s"$database.$tableName already exists.")
       }
     } else {
       hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true, false)).toRdd
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..941a2941649b8862fc8498ca2ac608f6eeb17e8e
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.ql.session.SessionState
+import org.apache.hadoop.hive.serde.serdeConstants
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.hive.client.{ManagedTable, HiveColumn, ExternalTable, HiveTable}
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+
+class HiveQlSuite extends FunSuite with BeforeAndAfterAll {
+  override def beforeAll() {
+    if (SessionState.get() == null) {
+      SessionState.start(new HiveConf())
+    }
+  }
+
+  private def extractTableDesc(sql: String): (HiveTable, Boolean) = {
+    HiveQl.createPlan(sql).collect {
+      case CreateTableAsSelect(desc, child, allowExisting) => (desc, allowExisting)
+    }.head
+  }
+
+  test("Test CTAS #1") {
+    val s1 =
+      """CREATE EXTERNAL TABLE IF NOT EXISTS mydb.page_view
+        |(viewTime INT,
+        |userid BIGINT,
+        |page_url STRING,
+        |referrer_url STRING,
+        |ip STRING COMMENT 'IP Address of the User',
+        |country STRING COMMENT 'country of origination')
+        |COMMENT 'This is the staging page view table'
+        |PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 'hour of the day')
+        |ROW FORMAT DELIMITED FIELDS TERMINATED BY '\054' STORED AS RCFILE
+        |LOCATION '/user/external/page_view'
+        |TBLPROPERTIES ('p1'='v1', 'p2'='v2')
+        |AS SELECT * FROM src""".stripMargin
+
+    val (desc, exists) = extractTableDesc(s1)
+    assert(exists == true)
+    assert(desc.specifiedDatabase == Some("mydb"))
+    assert(desc.name == "page_view")
+    assert(desc.tableType == ExternalTable)
+    assert(desc.location == Some("/user/external/page_view"))
+    assert(desc.schema ==
+      HiveColumn("viewtime", "int", null) ::
+        HiveColumn("userid", "bigint", null) ::
+        HiveColumn("page_url", "string", null) ::
+        HiveColumn("referrer_url", "string", null) ::
+        HiveColumn("ip", "string", "IP Address of the User") ::
+        HiveColumn("country", "string", "country of origination") :: Nil)
+    // TODO will be SQLText
+    assert(desc.viewText == Option("This is the staging page view table"))
+    assert(desc.partitionColumns ==
+      HiveColumn("dt", "string", "date type") ::
+        HiveColumn("hour", "string", "hour of the day") :: Nil)
+    assert(desc.serdeProperties ==
+      Map((serdeConstants.SERIALIZATION_FORMAT, "\054"), (serdeConstants.FIELD_DELIM, "\054")))
+    assert(desc.inputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
+    assert(desc.outputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
+    assert(desc.serde == Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
+    assert(desc.properties == Map(("p1", "v1"), ("p2", "v2")))
+  }
+
+  test("Test CTAS #2") {
+    val s2 =
+      """CREATE EXTERNAL TABLE IF NOT EXISTS mydb.page_view
+        |(viewTime INT,
+        |userid BIGINT,
+        |page_url STRING,
+        |referrer_url STRING,
+        |ip STRING COMMENT 'IP Address of the User',
+        |country STRING COMMENT 'country of origination')
+        |COMMENT 'This is the staging page view table'
+        |PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 'hour of the day')
+        |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
+        | STORED AS
+        | INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
+        | OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
+        |LOCATION '/user/external/page_view'
+        |TBLPROPERTIES ('p1'='v1', 'p2'='v2')
+        |AS SELECT * FROM src""".stripMargin
+
+    val (desc, exists) = extractTableDesc(s2)
+    assert(exists == true)
+    assert(desc.specifiedDatabase == Some("mydb"))
+    assert(desc.name == "page_view")
+    assert(desc.tableType == ExternalTable)
+    assert(desc.location == Some("/user/external/page_view"))
+    assert(desc.schema ==
+      HiveColumn("viewtime", "int", null) ::
+        HiveColumn("userid", "bigint", null) ::
+        HiveColumn("page_url", "string", null) ::
+        HiveColumn("referrer_url", "string", null) ::
+        HiveColumn("ip", "string", "IP Address of the User") ::
+        HiveColumn("country", "string", "country of origination") :: Nil)
+    // TODO will be SQLText
+    assert(desc.viewText == Option("This is the staging page view table"))
+    assert(desc.partitionColumns ==
+      HiveColumn("dt", "string", "date type") ::
+        HiveColumn("hour", "string", "hour of the day") :: Nil)
+    assert(desc.serdeProperties == Map())
+    assert(desc.inputFormat == Option("parquet.hive.DeprecatedParquetInputFormat"))
+    assert(desc.outputFormat == Option("parquet.hive.DeprecatedParquetOutputFormat"))
+    assert(desc.serde == Option("parquet.hive.serde.ParquetHiveSerDe"))
+    assert(desc.properties == Map(("p1", "v1"), ("p2", "v2")))
+  }
+
+  test("Test CTAS #3") {
+    val s3 = """CREATE TABLE page_view AS SELECT * FROM src"""
+    val (desc, exists) = extractTableDesc(s3)
+    assert(exists == false)
+    assert(desc.specifiedDatabase == None)
+    assert(desc.name == "page_view")
+    assert(desc.tableType == ManagedTable)
+    assert(desc.location == None)
+    assert(desc.schema == Seq.empty[HiveColumn])
+    assert(desc.viewText == None) // TODO will be SQLText
+    assert(desc.serdeProperties == Map())
+    assert(desc.inputFormat == Option("org.apache.hadoop.mapred.TextInputFormat"))
+    assert(desc.outputFormat == Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
+    assert(desc.serde.isEmpty)
+    assert(desc.properties == Map())
+  }
+
+  test("Test CTAS #4") {
+    val s4 =
+      """CREATE TABLE page_view
+        |STORED BY 'storage.handler.class.name' AS SELECT * FROM src""".stripMargin
+    intercept[AnalysisException] {
+      extractTableDesc(s4)
+    }
+  }
+
+  test("Test CTAS #5") {
+    val s5 = """CREATE TABLE ctas2
+               | ROW FORMAT SERDE "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"
+               | WITH SERDEPROPERTIES("serde_p1"="p1","serde_p2"="p2")
+               | STORED AS RCFile
+               | TBLPROPERTIES("tbl_p1"="p11", "tbl_p2"="p22")
+               | AS
+               |   SELECT key, value
+               |   FROM src
+               |   ORDER BY key, value""".stripMargin
+    val (desc, exists) = extractTableDesc(s5)
+    assert(exists == false)
+    assert(desc.specifiedDatabase == None)
+    assert(desc.name == "ctas2")
+    assert(desc.tableType == ManagedTable)
+    assert(desc.location == None)
+    assert(desc.schema == Seq.empty[HiveColumn])
+    assert(desc.viewText == None) // TODO will be SQLText
+    assert(desc.serdeProperties == Map(("serde_p1" -> "p1"), ("serde_p2" -> "p2")))
+    assert(desc.inputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
+    assert(desc.outputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
+    assert(desc.serde == Option("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"))
+    assert(desc.properties == Map(("tbl_p1" -> "p11"), ("tbl_p2" -> "p22")))
+  }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 1353802604402d155ded68cf65f1aa721bcf406f..0d739dead4c731f27f37e4b6d5462d7614345b36 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -201,7 +201,7 @@ class SQLQuerySuite extends QueryTest {
     var message = intercept[AnalysisException] {
       sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
     }.getMessage
-    assert(message.contains("Table ctas1 already exists"))
+    assert(message.contains("ctas1 already exists"))
     checkRelation("ctas1", true)
     sql("DROP TABLE ctas1")
 
@@ -314,7 +314,7 @@ class SQLQuerySuite extends QueryTest {
           SELECT key, value
           FROM src
           ORDER BY key, value""").collect().toSeq)
-    intercept[org.apache.hadoop.hive.metastore.api.AlreadyExistsException] {
+    intercept[AnalysisException] {
       sql(
         """CREATE TABLE ctas4 AS
           | SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect()