diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 89f4a19add1c65597f0f94718cb961775f4c719f..ee04cb579deb656ae80be5fddf8c2b0b39e29361 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -111,6 +111,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
   protected val UPPER = Keyword("UPPER")
   protected val WHEN = Keyword("WHEN")
   protected val WHERE = Keyword("WHERE")
+  protected val WITH = Keyword("WITH")
 
   protected def assignAliases(exprs: Seq[Expression]): Seq[NamedExpression] = {
     exprs.zipWithIndex.map {
@@ -127,6 +128,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
       | UNION ~ DISTINCT.? ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
       )
     | insert
+    | cte
     )
 
   protected lazy val select: Parser[LogicalPlan] =
@@ -156,6 +158,11 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
       case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o)
     }
 
+  protected lazy val cte: Parser[LogicalPlan] =
+    WITH ~> rep1sep(ident ~ ( AS ~ "(" ~> start <~ ")"), ",") ~ start ^^ {
+      case r ~ s => With(s, r.map({case n ~ s => (n, Subquery(n, s))}).toMap)
+    }
+
   protected lazy val projection: Parser[Expression] =
     expression ~ (AS.? ~> ident.?) ^^ {
       case e ~ a => a.fold(e)(Alias(e, _)())
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 524c73c31bbe19374455e9e537260ec4efdc96f4..b83f18abdd2396101620da6905e7dd4c01c3c7ab 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -169,21 +169,36 @@ class Analyzer(
    * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
    */
   object ResolveRelations extends Rule[LogicalPlan] {
-    def getTable(u: UnresolvedRelation): LogicalPlan = {
+    def getTable(u: UnresolvedRelation, cteRelations: Map[String, LogicalPlan]) = {
       try {
-        catalog.lookupRelation(u.tableIdentifier, u.alias)
+        // In hive, if there is same table name in database and CTE definition,
+        // hive will use the table in database, not the CTE one.
+        // Taking into account the reasonableness and the implementation complexity,
+        // here use the CTE definition first, check table name only and ignore database name
+        cteRelations.get(u.tableIdentifier.last)
+          .map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation))
+          .getOrElse(catalog.lookupRelation(u.tableIdentifier, u.alias))
       } catch {
         case _: NoSuchTableException =>
           u.failAnalysis(s"no such table ${u.tableName}")
       }
     }
 
-    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-      case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _) =>
-        i.copy(
-          table = EliminateSubQueries(getTable(u)))
-      case u: UnresolvedRelation =>
-        getTable(u)
+    def apply(plan: LogicalPlan): LogicalPlan = {
+      val (realPlan, cteRelations) = plan match {
+        // TODO allow subquery to define CTE
+        // Add cte table to a temp relation map,drop `with` plan and keep its child
+        case With(child, relations) => (child, relations)
+        case other => (other, Map.empty[String, LogicalPlan])
+      }
+
+      realPlan transform {
+        case i@InsertIntoTable(u: UnresolvedRelation, _, _, _) =>
+          i.copy(
+            table = EliminateSubQueries(getTable(u, cteRelations)))
+        case u: UnresolvedRelation =>
+          getTable(u, cteRelations)
+      }
     }
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 8633e06093cf3439cd1ac81122db35e0b8a07559..3bd5aa5964221b469506065eca25612251fcf6f8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -147,6 +147,18 @@ case class CreateTableAsSelect[T](
   override lazy val resolved: Boolean = databaseName != None && childrenResolved
 }
 
+/**
+ * A container for holding named common table expressions (CTEs) and a query plan.
+ * This operator will be removed during analysis and the relations will be substituted into child.
+ * @param child The final query of this CTE.
+ * @param cteRelations Queries that this CTE defined,
+ *                     key is the alias of the CTE definition,
+ *                     value is the CTE definition.
+ */
+case class With(child: LogicalPlan, cteRelations: Map[String, Subquery]) extends UnaryNode {
+  override def output = child.output
+}
+
 case class WriteToFile(
     path: String,
     child: LogicalPlan) extends UnaryNode {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 1392b4819131b2ef7863109189f842b7fc114027..fb8fc6dbd1e1ef16abee56280c47ef28db6244b7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -407,6 +407,20 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
       mapData.collect().take(1).map(Row.fromTuple).toSeq)
   }
 
+  test("CTE feature") {
+    checkAnswer(
+      sql("with q1 as (select * from testData limit 10) select * from q1"),
+      testData.take(10).toSeq)
+
+    checkAnswer(
+      sql("""
+        |with q1 as (select * from testData where key= '5'),
+        |q2 as (select * from testData where key = '4')
+        |select * from q1 union all select * from q2""".stripMargin),
+      Row(5, "5") :: Row(4, "4") :: Nil)
+
+  }
+
   test("date row") {
     checkAnswer(sql(
       """select cast("2015-01-28" as date) from testData limit 1"""),
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 0bdaf5f7ef8ef11f6399533329fee7dc9df9d174..2fb2e7c4a537030d461762bbf4e4ce748b419141 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -576,11 +576,23 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
     case Token("TOK_QUERY", queryArgs)
         if Seq("TOK_FROM", "TOK_INSERT").contains(queryArgs.head.getText) =>
 
-      val (fromClause: Option[ASTNode], insertClauses) = queryArgs match {
-        case Token("TOK_FROM", args: Seq[ASTNode]) :: insertClauses =>
-          (Some(args.head), insertClauses)
-        case Token("TOK_INSERT", _) :: Nil => (None, queryArgs)
-      }
+      val (fromClause: Option[ASTNode], insertClauses, cteRelations) =
+        queryArgs match {
+          case Token("TOK_FROM", args: Seq[ASTNode]) :: insertClauses =>
+            // check if has CTE
+            insertClauses.last match {
+              case Token("TOK_CTE", cteClauses) =>
+                val cteRelations = cteClauses.map(node => {
+                  val relation = nodeToRelation(node).asInstanceOf[Subquery]
+                  (relation.alias, relation)
+                }).toMap
+                (Some(args.head), insertClauses.init, Some(cteRelations))
+
+              case _ => (Some(args.head), insertClauses, None)
+            }
+
+          case Token("TOK_INSERT", _) :: Nil => (None, queryArgs, None)
+        }
 
       // Return one query for each insert clause.
       val queries = insertClauses.map { case Token("TOK_INSERT", singleInsert) =>
@@ -794,7 +806,10 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
       }
 
       // If there are multiple INSERTS just UNION them together into on query.
-      queries.reduceLeft(Union)
+      val query = queries.reduceLeft(Union)
+
+      // return With plan if there is CTE
+      cteRelations.map(With(query, _)).getOrElse(query)
 
     case Token("TOK_UNION", left :: right :: Nil) => Union(nodeToPlan(left), nodeToPlan(right))
 
diff --git a/sql/hive/src/test/resources/golden/CTE feature #1-0-eedabbfe6ba8799f7b7782fb47a82768 b/sql/hive/src/test/resources/golden/CTE feature #1-0-eedabbfe6ba8799f7b7782fb47a82768
new file mode 100644
index 0000000000000000000000000000000000000000..f6ba75da254caa70f552693c14cbbec11e637ad3
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/CTE feature #1-0-eedabbfe6ba8799f7b7782fb47a82768	
@@ -0,0 +1,3 @@
+5
+5
+5
diff --git a/sql/hive/src/test/resources/golden/CTE feature #2-0-aa03d104251f97e36bc52279cb9931c9 b/sql/hive/src/test/resources/golden/CTE feature #2-0-aa03d104251f97e36bc52279cb9931c9
new file mode 100644
index 0000000000000000000000000000000000000000..ca7b591095e280358ce1924279226e89dd16dbc3
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/CTE feature #2-0-aa03d104251f97e36bc52279cb9931c9	
@@ -0,0 +1,4 @@
+val_4
+val_5
+val_5
+val_5
diff --git a/sql/hive/src/test/resources/golden/CTE feature #3-0-b5d4bf3c0ee92b2fda0ca24f422383f2 b/sql/hive/src/test/resources/golden/CTE feature #3-0-b5d4bf3c0ee92b2fda0ca24f422383f2
new file mode 100644
index 0000000000000000000000000000000000000000..b8626c4cff2849624fb67f87cd0ad72b163671ad
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/CTE feature #3-0-b5d4bf3c0ee92b2fda0ca24f422383f2	
@@ -0,0 +1 @@
+4
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index af781a502e9f3be3f0c56e84bdefb608a6fe128d..1222fbabd8b33689319c108b75f1838bf2844267 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -542,6 +542,21 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
   createQueryTest("select null from table",
     "SELECT null FROM src LIMIT 1")
 
+  createQueryTest("CTE feature #1",
+    "with q1 as (select key from src) select * from q1 where key = 5")
+
+  createQueryTest("CTE feature #2",
+    """with q1 as (select * from src where key= 5),
+      |q2 as (select * from src s2 where key = 4)
+      |select value from q1 union all select value from q2
+    """.stripMargin)
+
+  createQueryTest("CTE feature #3",
+    """with q1 as (select key from src)
+      |from q1
+      |select * where key = 4
+    """.stripMargin)
+
   test("predicates contains an empty AttributeSet() references") {
     sql(
       """