diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 273ad9289169df48b43a6ef00d8169201369e219..ee27d69ab3f9e4c341d02cb7c9631402c055fc10 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -84,7 +84,7 @@ statement
     | ALTER TABLE tableIdentifier partitionSpec? SET locationSpec      #setTableLocation
     | DROP TABLE (IF EXISTS)? tableIdentifier PURGE?                   #dropTable
     | DROP VIEW (IF EXISTS)? tableIdentifier                           #dropTable
-    | CREATE (OR REPLACE)? VIEW (IF NOT EXISTS)? tableIdentifier
+    | CREATE (OR REPLACE)? TEMPORARY? VIEW (IF NOT EXISTS)? tableIdentifier
         identifierCommentList? (COMMENT STRING)?
         (PARTITIONED ON identifierList)?
         (TBLPROPERTIES tablePropertyList)? AS query                    #createView
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 60388df59646490c8240f3dd397b444e23965210..146e036bb4843eef7d2ad129d9bf7c5889295957 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -935,7 +935,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    *
    * For example:
    * {{{
-   *   CREATE VIEW [IF NOT EXISTS] [db_name.]view_name
+   *   CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [db_name.]view_name
    *   [(column_name [COMMENT column_comment], ...) ]
    *   [COMMENT view_comment]
    *   [TBLPROPERTIES (property_name = property_value, ...)]
@@ -958,7 +958,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
         ctx.query,
         Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty),
         ctx.EXISTS != null,
-        ctx.REPLACE != null
+        ctx.REPLACE != null,
+        ctx.TEMPORARY != null
       )
     }
   }
@@ -975,7 +976,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       ctx.query,
       Map.empty,
       allowExist = false,
-      replace = true)
+      replace = true,
+      isTemporary = false)
   }
 
   /**
@@ -989,7 +991,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       query: QueryContext,
       properties: Map[String, String],
       allowExist: Boolean,
-      replace: Boolean): LogicalPlan = {
+      replace: Boolean,
+      isTemporary: Boolean): LogicalPlan = {
     val sql = Option(source(query))
     val tableDesc = CatalogTable(
       identifier = visitTableIdentifier(name),
@@ -1000,7 +1003,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       viewOriginalText = sql,
       viewText = sql,
       comment = comment)
-    CreateViewCommand(tableDesc, plan(query), allowExist, replace, command(ctx))
+    CreateViewCommand(tableDesc, plan(query), allowExist, replace, isTemporary, command(ctx))
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 0f656ef53e3959421a0604c6df462398eacb0b6b..70ce5c84290102428c1b3f1722b340ad73a68f86 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
 import scala.util.control.NonFatal
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.SQLBuilder
+import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
@@ -37,6 +37,10 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
  *                already exists, throws analysis exception.
  * @param replace if true, and if the view already exists, updates it; if false, and if the view
  *                already exists, throws analysis exception.
+ * @param isTemporary if true, the view is created as a temporary view. Temporary views are dropped
+ *                 at the end of current Spark session. Existing permanent relations with the same
+ *                 name are not visible to the current session while the temporary view exists,
+ *                 unless they are specified with full qualified table name with database prefix.
  * @param sql the original sql
  */
 case class CreateViewCommand(
@@ -44,6 +48,7 @@ case class CreateViewCommand(
     child: LogicalPlan,
     allowExisting: Boolean,
     replace: Boolean,
+    isTemporary: Boolean,
     sql: String)
   extends RunnableCommand {
 
@@ -55,12 +60,23 @@ case class CreateViewCommand(
   require(tableDesc.tableType == CatalogTableType.VIEW)
   require(tableDesc.viewText.isDefined)
 
-  private val tableIdentifier = tableDesc.identifier
-
   if (allowExisting && replace) {
     throw new AnalysisException("CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.")
   }
 
+  // Disallows 'CREATE TEMPORARY VIEW IF NOT EXISTS' to be consistent with 'CREATE TEMPORARY TABLE'
+  if (allowExisting && isTemporary) {
+    throw new AnalysisException(
+      "It is not allowed to define a TEMPORARY view with IF NOT EXISTS.")
+  }
+
+  // Temporary view names should NOT contain database prefix like "database.table"
+  if (isTemporary && tableDesc.identifier.database.isDefined) {
+    val database = tableDesc.identifier.database.get
+    throw new AnalysisException(
+      s"It is not allowed to add database prefix ${database} for the TEMPORARY view name.")
+  }
+
   override def run(sparkSession: SparkSession): Seq[Row] = {
     // If the plan cannot be analyzed, throw an exception and don't proceed.
     val qe = sparkSession.executePlan(child)
@@ -70,29 +86,59 @@ case class CreateViewCommand(
     require(tableDesc.schema == Nil || tableDesc.schema.length == analyzedPlan.output.length)
     val sessionState = sparkSession.sessionState
 
-    if (sessionState.catalog.tableExists(tableIdentifier)) {
-      if (allowExisting) {
-        // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
-        // already exists.
-      } else if (replace) {
-        // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
-        sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan))
+    if (isTemporary) {
+      createTemporaryView(tableDesc.identifier, sparkSession, analyzedPlan)
+    } else {
+      // Adds default database for permanent table if it doesn't exist, so that tableExists()
+      // only check permanent tables.
+      val database = tableDesc.identifier.database.getOrElse(
+        sessionState.catalog.getCurrentDatabase)
+      val tableIdentifier = tableDesc.identifier.copy(database = Option(database))
+
+      if (sessionState.catalog.tableExists(tableIdentifier)) {
+        if (allowExisting) {
+          // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
+          // already exists.
+        } else if (replace) {
+          // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
+          sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan))
+        } else {
+          // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
+          // exists.
+          throw new AnalysisException(
+            s"View $tableIdentifier already exists. If you want to update the view definition, " +
+              "please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS")
+        }
       } else {
-        // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
-        // exists.
-        throw new AnalysisException(s"View $tableIdentifier already exists. " +
-          "If you want to update the view definition, please use ALTER VIEW AS or " +
-          "CREATE OR REPLACE VIEW AS")
+        // Create the view if it doesn't exist.
+        sessionState.catalog.createTable(
+          prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false)
       }
-    } else {
-      // Create the view if it doesn't exist.
-      sessionState.catalog.createTable(
-        prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false)
     }
-
     Seq.empty[Row]
   }
 
+  private def createTemporaryView(
+      table: TableIdentifier, sparkSession: SparkSession, analyzedPlan: LogicalPlan): Unit = {
+
+    val sessionState = sparkSession.sessionState
+    val catalog = sessionState.catalog
+
+    // Projects column names to alias names
+    val logicalPlan = {
+      if (tableDesc.schema.isEmpty) {
+        analyzedPlan
+      } else {
+        val projectList = analyzedPlan.output.zip(tableDesc.schema).map {
+          case (attr, col) => Alias(attr, col.name)()
+        }
+        sparkSession.executePlan(Project(projectList, analyzedPlan)).analyzed
+      }
+    }
+
+    catalog.createTempTable(table.table, logicalPlan, replace)
+  }
+
   /**
    * Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize
    * SQL based on the analyzed plan, and also creates the proper schema for the view.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index c4ebc604dc13c9af1bea2560662981175456c43d..3d74235dc52bc8a2f36e1d6bbcc44a0ad40e7a0d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -39,7 +39,7 @@ class HiveDDLCommandSuite extends PlanTest {
     parser.parsePlan(sql).collect {
       case CreateTable(desc, allowExisting) => (desc, allowExisting)
       case CreateTableAsSelectLogicalPlan(desc, _, allowExisting) => (desc, allowExisting)
-      case CreateViewCommand(desc, _, allowExisting, _, _) => (desc, allowExisting)
+      case CreateViewCommand(desc, _, allowExisting, _, _, _) => (desc, allowExisting)
     }.head
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
index 51848470502af5b729f2f9cd174109f7aebce140..72f9fba13d4bb84b03a1d300e1877d2d59f88c6a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -37,11 +37,21 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     sqlContext.sql(s"DROP TABLE IF EXISTS jt")
   }
 
-  test("nested views") {
-    withView("jtv1", "jtv2") {
-      sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3").collect()
-      sql("CREATE VIEW jtv2 AS SELECT * FROM jtv1 WHERE id < 6").collect()
+  test("nested views (interleaved with temporary views)") {
+    withView("jtv1", "jtv2", "jtv3", "temp_jtv1", "temp_jtv2", "temp_jtv3") {
+      sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3")
+      sql("CREATE VIEW jtv2 AS SELECT * FROM jtv1 WHERE id < 6")
       checkAnswer(sql("select count(*) FROM jtv2"), Row(2))
+
+      // Checks temporary views
+      sql("CREATE TEMPORARY VIEW temp_jtv1 AS SELECT * FROM jt WHERE id > 3")
+      sql("CREATE TEMPORARY VIEW temp_jtv2 AS SELECT * FROM temp_jtv1 WHERE id < 6")
+      checkAnswer(sql("select count(*) FROM temp_jtv2"), Row(2))
+
+      // Checks interleaved temporary view and normal view
+      sql("CREATE TEMPORARY VIEW temp_jtv3 AS SELECT * FROM jt WHERE id > 3")
+      sql("CREATE VIEW jtv3 AS SELECT * FROM temp_jtv3 WHERE id < 6")
+      checkAnswer(sql("select count(*) FROM jtv3"), Row(2))
     }
   }
 
@@ -57,6 +67,33 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     }
   }
 
+  test("error handling: fail if the temp view name contains the database prefix") {
+    // Fully qualified table name like "database.table" is not allowed for temporary view
+    val e = intercept[AnalysisException] {
+      sql("CREATE OR REPLACE TEMPORARY VIEW default.myabcdview AS SELECT * FROM jt")
+    }
+    assert(e.message.contains("It is not allowed to add database prefix"))
+  }
+
+  test("error handling: disallow IF NOT EXISTS for CREATE TEMPORARY VIEW") {
+    val e = intercept[AnalysisException] {
+      sql("CREATE TEMPORARY VIEW IF NOT EXISTS myabcdview AS SELECT * FROM jt")
+    }
+    assert(e.message.contains("It is not allowed to define a TEMPORARY view with IF NOT EXISTS"))
+  }
+
+  test("error handling: fail if the temp view sql itself is invalid") {
+     // A table that does not exist for temporary view
+    intercept[AnalysisException] {
+      sql("CREATE OR REPLACE TEMPORARY VIEW myabcdview AS SELECT * FROM table_not_exist1345")
+    }
+
+    // A column that does not exist, for temporary view
+    intercept[AnalysisException] {
+      sql("CREATE OR REPLACE TEMPORARY VIEW myabcdview AS SELECT random1234 FROM jt")
+    }
+  }
+
   test("correctly parse CREATE VIEW statement") {
     withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
       sql(
@@ -69,18 +106,70 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     }
   }
 
+  test("correctly parse CREATE TEMPORARY VIEW statement") {
+    withView("testView") {
+      sql(
+        """CREATE TEMPORARY VIEW
+          |testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
+          |TBLPROPERTIES ('a' = 'b')
+          |AS SELECT * FROM jt
+          |""".stripMargin)
+      checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
+    }
+  }
+
+  test("should NOT allow CREATE TEMPORARY VIEW when TEMPORARY VIEW with same name exists") {
+    withView("testView") {
+      sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt")
+
+      val e = intercept[AnalysisException] {
+        sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt")
+      }
+
+      assert(e.message.contains("Temporary table") && e.message.contains("already exists"))
+    }
+  }
+
+  test("should allow CREATE TEMPORARY VIEW when a permanent VIEW with same name exists") {
+    withView("testView", "default.testView") {
+      sql("CREATE VIEW testView AS SELECT id FROM jt")
+      sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt")
+    }
+  }
+
+  test("should allow CREATE permanent VIEW when a TEMPORARY VIEW with same name exists") {
+    withView("testView", "default.testView") {
+      sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt")
+      sql("CREATE VIEW testView AS SELECT id FROM jt")
+    }
+  }
+
   test("correctly handle CREATE VIEW IF NOT EXISTS") {
     withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
       withTable("jt2") {
-        sql("CREATE VIEW testView AS SELECT id FROM jt")
+        withView("testView") {
+          sql("CREATE VIEW testView AS SELECT id FROM jt")
 
-        val df = (1 until 10).map(i => i -> i).toDF("i", "j")
-        df.write.format("json").saveAsTable("jt2")
-        sql("CREATE VIEW IF NOT EXISTS testView AS SELECT * FROM jt2")
+          val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+          df.write.format("json").saveAsTable("jt2")
+          sql("CREATE VIEW IF NOT EXISTS testView AS SELECT * FROM jt2")
 
-        // make sure our view doesn't change.
+          // make sure our view doesn't change.
+          checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
+        }
+      }
+    }
+  }
+
+  test(s"correctly handle CREATE OR REPLACE TEMPORARY VIEW") {
+    withTable("jt2") {
+      withView("testView") {
+        sql("CREATE OR REPLACE TEMPORARY VIEW testView AS SELECT id FROM jt")
         checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
-        sql("DROP VIEW testView")
+
+        sql("CREATE OR REPLACE TEMPORARY VIEW testView AS SELECT id AS i, id AS j FROM jt")
+        // make sure the view has been changed.
+        checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
       }
     }
   }
@@ -215,5 +304,4 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
       }
     }
   }
-
 }