diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index b0e71c7e7c7d1de378dd7fdfcee0e3a88e6ca635..2dd3cfab177cdec626e2cb57a2390959b466a028 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -90,6 +90,9 @@ statement
         identifierCommentList? (COMMENT STRING)?
         (PARTITIONED ON identifierList)?
         (TBLPROPERTIES tablePropertyList)? AS query                    #createView
+    | CREATE (OR REPLACE)? TEMPORARY VIEW
+        tableIdentifier ('(' colTypeList ')')? tableProvider
+        (OPTIONS tablePropertyList)?                                   #createTempViewUsing
     | ALTER VIEW tableIdentifier AS? query                             #alterViewQuery
     | CREATE TEMPORARY? FUNCTION qualifiedName AS className=STRING
         (USING resource (',' resource)*)?                              #createFunction
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index c68c8f80f87ad7544b0d92b9add274c869bbd88f..dc742220512bbd60e106f846701a717fd06c34a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.parser._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema}
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.{CreateTempViewUsing, _}
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
 import org.apache.spark.sql.types.DataType
 
@@ -346,6 +346,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     }
   }
 
+  /**
+   * Creates a [[CreateTempViewUsing]] logical plan.
+   */
+  override def visitCreateTempViewUsing(
+      ctx: CreateTempViewUsingContext): LogicalPlan = withOrigin(ctx) {
+    CreateTempViewUsing(
+      tableIdent = visitTableIdentifier(ctx.tableIdentifier()),
+      userSpecifiedSchema = Option(ctx.colTypeList()).map(createStructType),
+      replace = ctx.REPLACE != null,
+      provider = ctx.tableProvider.qualifiedName.getText,
+      options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
+  }
+
   /**
    * Create a [[LoadDataCommand]] command.
    *
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index b20897e2d6599578a7668ef998f33a5eabfae011..a36fe78a499c616efc630c5a09768c8eaa1f0187 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -376,9 +376,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
   object DDLStrategy extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case c: CreateTableUsing if c.temporary && !c.allowExisting =>
+        logWarning(
+          s"CREATE TEMPORARY TABLE ${c.tableIdent.identifier} USING... is deprecated, " +
+            s"please use CREATE TEMPORARY VIEW viewName USING... instead")
         ExecutedCommandExec(
-          CreateTempTableUsing(
-            c.tableIdent, c.userSpecifiedSchema, c.provider, c.options)) :: Nil
+          CreateTempViewUsing(
+            c.tableIdent, c.userSpecifiedSchema, replace = true, c.provider, c.options)) :: Nil
 
       case c: CreateTableUsing if !c.temporary =>
         val cmd =
@@ -409,6 +412,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
             c.child)
         ExecutedCommandExec(cmd) :: Nil
 
+      case c: CreateTempViewUsing =>
+        ExecutedCommandExec(c) :: Nil
       case _ => Nil
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index bf272e3c0659d3cba509753f4d25288ffc968c3d..aa42eae986be069ca24505189b9637ece9c1d6be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -64,9 +64,10 @@ case class CreateTableUsingAsSelect(
   override def output: Seq[Attribute] = Seq.empty[Attribute]
 }
 
-case class CreateTempTableUsing(
+case class CreateTempViewUsing(
     tableIdent: TableIdentifier,
     userSpecifiedSchema: Option[StructType],
+    replace: Boolean,
     provider: String,
     options: Map[String, String]) extends RunnableCommand {
 
@@ -84,7 +85,7 @@ case class CreateTempTableUsing(
     sparkSession.sessionState.catalog.createTempView(
       tableIdent.table,
       Dataset.ofRows(sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan,
-      overrideIfExists = true)
+      replace)
 
     Seq.empty[Row]
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 741ea673e97e76d8fedf511601026c76c25a3be1..a7e6893caaeb1781a8785ba9c4f0bfbaa9ee0f6d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -25,7 +25,7 @@ import org.scalatest.BeforeAndAfterEach
 import org.apache.spark.internal.config._
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchPartitionException, NoSuchTableException}
+import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat}
 import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
@@ -422,6 +422,25 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     }
   }
 
+  test("create temporary view using") {
+    val csvFile = Thread.currentThread().getContextClassLoader.getResource("cars.csv").toString()
+    withView("testview") {
+      sql(s"CREATE OR REPLACE TEMPORARY VIEW testview (c1: String, c2: String)  USING " +
+        "org.apache.spark.sql.execution.datasources.csv.CSVFileFormat  " +
+        s"OPTIONS (PATH '$csvFile')")
+
+      checkAnswer(
+        sql("select c1, c2 from testview order by c1 limit 1"),
+          Row("1997", "Ford") :: Nil)
+
+      // Fails if creating a new view with the same name
+      intercept[TempTableAlreadyExistsException] {
+        sql(s"CREATE TEMPORARY VIEW testview USING " +
+          s"org.apache.spark.sql.execution.datasources.csv.CSVFileFormat OPTIONS (PATH '$csvFile')")
+      }
+    }
+  }
+
   test("alter table: rename") {
     val catalog = spark.sessionState.catalog
     val tableIdent1 = TableIdentifier("tab1", Some("dbx"))