diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 29f554451ed4ae408e45c360e046f43c6fb9f594..ef9f88a9026c92df4456f682e63e28e8f22eaed6 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -126,7 +126,8 @@ statement
         tableIdentifier ('(' colTypeList ')')? tableProvider
         (OPTIONS tablePropertyList)?                                   #createTempViewUsing
     | ALTER VIEW tableIdentifier AS? query                             #alterViewQuery
-    | CREATE TEMPORARY? FUNCTION qualifiedName AS className=STRING
+    | CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF NOT EXISTS)?
+        qualifiedName AS className=STRING
         (USING resource (',' resource)*)?                              #createFunction
     | DROP TEMPORARY? FUNCTION (IF EXISTS)? qualifiedName              #dropFunction
     | EXPLAIN (LOGICAL | FORMATTED | EXTENDED | CODEGEN | COST)?
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 0254b6bb6d136efd92dc821a18c090f182877e40..6000d483db209d83073530aa811fb16be04a9400 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -332,6 +332,15 @@ abstract class ExternalCatalog
 
   protected def doDropFunction(db: String, funcName: String): Unit
 
+  final def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = {
+    val name = funcDefinition.identifier.funcName
+    postToAll(AlterFunctionPreEvent(db, name))
+    doAlterFunction(db, funcDefinition)
+    postToAll(AlterFunctionEvent(db, name))
+  }
+
+  protected def doAlterFunction(db: String, funcDefinition: CatalogFunction): Unit
+
   final def renameFunction(db: String, oldName: String, newName: String): Unit = {
     postToAll(RenameFunctionPreEvent(db, oldName, newName))
     doRenameFunction(db, oldName, newName)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 747190faa3c8c7068bec89f4e082831298e99c85..d253c72a627397016d6600412e2ab9e59354ee8d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -590,6 +590,12 @@ class InMemoryCatalog(
     catalog(db).functions.remove(funcName)
   }
 
+  override protected def doAlterFunction(db: String, func: CatalogFunction): Unit = synchronized {
+    requireDbExists(db)
+    requireFunctionExists(db, func.identifier.funcName)
+    catalog(db).functions.put(func.identifier.funcName, func)
+  }
+
   override protected def doRenameFunction(
       db: String,
       oldName: String,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index a86604e4353ab361a687fd9cd0be1bbef17212f9..c40d5f6031a21eae5193609e5f1053083e43f5d0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -1055,6 +1055,29 @@ class SessionCatalog(
     }
   }
 
+  /**
+   * overwirte a metastore function in the database specified in `funcDefinition`..
+   * If no database is specified, assume the function is in the current database.
+   */
+  def alterFunction(funcDefinition: CatalogFunction): Unit = {
+    val db = formatDatabaseName(funcDefinition.identifier.database.getOrElse(getCurrentDatabase))
+    requireDbExists(db)
+    val identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db))
+    val newFuncDefinition = funcDefinition.copy(identifier = identifier)
+    if (functionExists(identifier)) {
+      if (functionRegistry.functionExists(identifier)) {
+        // If we have loaded this function into the FunctionRegistry,
+        // also drop it from there.
+        // For a permanent function, because we loaded it to the FunctionRegistry
+        // when it's first used, we also need to drop it from the FunctionRegistry.
+        functionRegistry.dropFunction(identifier)
+      }
+      externalCatalog.alterFunction(db, newFuncDefinition)
+    } else {
+      throw new NoSuchFunctionException(db = db, func = identifier.toString)
+    }
+  }
+
   /**
    * Retrieve the metadata of a metastore function.
    *
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala
index 459973a13bb10e3fada91e22c7e0e703b15990e8..742a51e640383bc638b7a1468b14176d62ca56f3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala
@@ -139,6 +139,16 @@ case class DropFunctionPreEvent(database: String, name: String) extends Function
  */
 case class DropFunctionEvent(database: String, name: String) extends FunctionEvent
 
+/**
+ * Event fired before a function is altered.
+ */
+case class AlterFunctionPreEvent(database: String, name: String) extends FunctionEvent
+
+/**
+ * Event fired after a function has been altered.
+ */
+case class AlterFunctionEvent(database: String, name: String) extends FunctionEvent
+
 /**
  * Event fired before a function is renamed.
  */
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala
index 2539ea615ff9283cc229b1258ab5ad99ace7d8ac..087c26f23f3833f8b9b6bbfe9fa89a5a20d806b2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala
@@ -176,6 +176,15 @@ class ExternalCatalogEventSuite extends SparkFunSuite {
     }
     checkEvents(RenameFunctionPreEvent("db5", "fn7", "fn4") :: Nil)
 
+    // ALTER
+    val alteredFunctionDefinition = CatalogFunction(
+      identifier = FunctionIdentifier("fn4", Some("db5")),
+      className = "org.apache.spark.AlterFunction",
+      resources = Seq.empty)
+    catalog.alterFunction("db5", alteredFunctionDefinition)
+    checkEvents(
+      AlterFunctionPreEvent("db5", "fn4") :: AlterFunctionEvent("db5", "fn4") :: Nil)
+
     // DROP
     intercept[AnalysisException] {
       catalog.dropFunction("db5", "fn7")
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index c22d55fc96a6514f11397a365ef0470bccf988a4..66e895a4690c120d2fa85451727846187efbe9d3 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -752,6 +752,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
     }
   }
 
+  test("alter function") {
+    val catalog = newBasicCatalog()
+    assert(catalog.getFunction("db2", "func1").className == funcClass)
+    val myNewFunc = catalog.getFunction("db2", "func1").copy(className = newFuncClass)
+    catalog.alterFunction("db2", myNewFunc)
+    assert(catalog.getFunction("db2", "func1").className == newFuncClass)
+  }
+
   test("list functions") {
     val catalog = newBasicCatalog()
     catalog.createFunction("db2", newFunc("func2"))
@@ -916,6 +924,7 @@ abstract class CatalogTestUtils {
   lazy val partWithEmptyValue =
     CatalogTablePartition(Map("a" -> "3", "b" -> ""), storageFormat)
   lazy val funcClass = "org.apache.spark.myFunc"
+  lazy val newFuncClass = "org.apache.spark.myNewFunc"
 
   /**
    * Creates a basic catalog, with the following structure:
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 2b79eb5eac0f102869c448bfc7ceab9129b35031..2f8e416e7df1bea4d5c9f8a3fb7d73ca5c1482fe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -687,8 +687,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
    *
    * For example:
    * {{{
-   *   CREATE [TEMPORARY] FUNCTION [db_name.]function_name AS class_name
-   *    [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri']];
+   *   CREATE [OR REPLACE] [TEMPORARY] FUNCTION [IF NOT EXISTS] [db_name.]function_name
+   *   AS class_name [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri']];
    * }}}
    */
   override def visitCreateFunction(ctx: CreateFunctionContext): LogicalPlan = withOrigin(ctx) {
@@ -709,7 +709,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
       functionIdentifier.funcName,
       string(ctx.className),
       resources,
-      ctx.TEMPORARY != null)
+      ctx.TEMPORARY != null,
+      ctx.EXISTS != null,
+      ctx.REPLACE != null)
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
index a91ad413f4d1bc9e9a8d67d9e5d5d12543ec07f4..4f92ffee687aae53f7c9a9a9f5adea780c8e657b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -31,13 +31,13 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
  * The DDL command that creates a function.
  * To create a temporary function, the syntax of using this command in SQL is:
  * {{{
- *    CREATE TEMPORARY FUNCTION functionName
+ *    CREATE [OR REPLACE] TEMPORARY FUNCTION functionName
  *    AS className [USING JAR\FILE 'uri' [, JAR|FILE 'uri']]
  * }}}
  *
  * To create a permanent function, the syntax in SQL is:
  * {{{
- *    CREATE FUNCTION [databaseName.]functionName
+ *    CREATE [OR REPLACE] FUNCTION [IF NOT EXISTS] [databaseName.]functionName
  *    AS className [USING JAR\FILE 'uri' [, JAR|FILE 'uri']]
  * }}}
  */
@@ -46,26 +46,46 @@ case class CreateFunctionCommand(
     functionName: String,
     className: String,
     resources: Seq[FunctionResource],
-    isTemp: Boolean)
+    isTemp: Boolean,
+    ifNotExists: Boolean,
+    replace: Boolean)
   extends RunnableCommand {
 
+  if (ifNotExists && replace) {
+    throw new AnalysisException("CREATE FUNCTION with both IF NOT EXISTS and REPLACE" +
+      " is not allowed.")
+  }
+
+  // Disallow to define a temporary function with `IF NOT EXISTS`
+  if (ifNotExists && isTemp) {
+    throw new AnalysisException(
+      "It is not allowed to define a TEMPORARY function with IF NOT EXISTS.")
+  }
+
+  // Temporary function names should not contain database prefix like "database.function"
+  if (databaseName.isDefined && isTemp) {
+    throw new AnalysisException(s"Specifying a database in CREATE TEMPORARY FUNCTION " +
+      s"is not allowed: '${databaseName.get}'")
+  }
+
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
     val func = CatalogFunction(FunctionIdentifier(functionName, databaseName), className, resources)
     if (isTemp) {
-      if (databaseName.isDefined) {
-        throw new AnalysisException(s"Specifying a database in CREATE TEMPORARY FUNCTION " +
-          s"is not allowed: '${databaseName.get}'")
-      }
       // We first load resources and then put the builder in the function registry.
       catalog.loadFunctionResources(resources)
-      catalog.registerFunction(func, overrideIfExists = false)
+      catalog.registerFunction(func, overrideIfExists = replace)
     } else {
-      // For a permanent, we will store the metadata into underlying external catalog.
-      // This function will be loaded into the FunctionRegistry when a query uses it.
-      // We do not load it into FunctionRegistry right now.
-      // TODO: should we also parse "IF NOT EXISTS"?
-      catalog.createFunction(func, ignoreIfExists = false)
+      // Handles `CREATE OR REPLACE FUNCTION AS ... USING ...`
+      if (replace && catalog.functionExists(func.identifier)) {
+        // alter the function in the metastore
+        catalog.alterFunction(CatalogFunction(func.identifier, className, resources))
+      } else {
+        // For a permanent, we will store the metadata into underlying external catalog.
+        // This function will be loaded into the FunctionRegistry when a query uses it.
+        // We do not load it into FunctionRegistry right now.
+        catalog.createFunction(CatalogFunction(func.identifier, className, resources), ifNotExists)
+      }
     }
     Seq.empty[Row]
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 8a6bc62fec96cb1a8ae5c96c55230461bba1b23c..5643c58d9f847474f5b7d1ed9090e33f6056e65e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -181,8 +181,29 @@ class DDLCommandSuite extends PlanTest {
         |'com.matthewrathbone.example.SimpleUDFExample' USING ARCHIVE '/path/to/archive',
         |FILE '/path/to/file'
       """.stripMargin
+    val sql3 =
+      """
+        |CREATE OR REPLACE TEMPORARY FUNCTION helloworld3 as
+        |'com.matthewrathbone.example.SimpleUDFExample' USING JAR '/path/to/jar1',
+        |JAR '/path/to/jar2'
+      """.stripMargin
+    val sql4 =
+      """
+        |CREATE OR REPLACE FUNCTION hello.world1 as
+        |'com.matthewrathbone.example.SimpleUDFExample' USING ARCHIVE '/path/to/archive',
+        |FILE '/path/to/file'
+      """.stripMargin
+    val sql5 =
+      """
+        |CREATE FUNCTION IF NOT EXISTS hello.world2 as
+        |'com.matthewrathbone.example.SimpleUDFExample' USING ARCHIVE '/path/to/archive',
+        |FILE '/path/to/file'
+      """.stripMargin
     val parsed1 = parser.parsePlan(sql1)
     val parsed2 = parser.parsePlan(sql2)
+    val parsed3 = parser.parsePlan(sql3)
+    val parsed4 = parser.parsePlan(sql4)
+    val parsed5 = parser.parsePlan(sql5)
     val expected1 = CreateFunctionCommand(
       None,
       "helloworld",
@@ -190,7 +211,7 @@ class DDLCommandSuite extends PlanTest {
       Seq(
         FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar1"),
         FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar2")),
-      isTemp = true)
+      isTemp = true, ifNotExists = false, replace = false)
     val expected2 = CreateFunctionCommand(
       Some("hello"),
       "world",
@@ -198,9 +219,36 @@ class DDLCommandSuite extends PlanTest {
       Seq(
         FunctionResource(FunctionResourceType.fromString("archive"), "/path/to/archive"),
         FunctionResource(FunctionResourceType.fromString("file"), "/path/to/file")),
-      isTemp = false)
+      isTemp = false, ifNotExists = false, replace = false)
+    val expected3 = CreateFunctionCommand(
+      None,
+      "helloworld3",
+      "com.matthewrathbone.example.SimpleUDFExample",
+      Seq(
+        FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar1"),
+        FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar2")),
+      isTemp = true, ifNotExists = false, replace = true)
+    val expected4 = CreateFunctionCommand(
+      Some("hello"),
+      "world1",
+      "com.matthewrathbone.example.SimpleUDFExample",
+      Seq(
+        FunctionResource(FunctionResourceType.fromString("archive"), "/path/to/archive"),
+        FunctionResource(FunctionResourceType.fromString("file"), "/path/to/file")),
+      isTemp = false, ifNotExists = false, replace = true)
+    val expected5 = CreateFunctionCommand(
+      Some("hello"),
+      "world2",
+      "com.matthewrathbone.example.SimpleUDFExample",
+      Seq(
+        FunctionResource(FunctionResourceType.fromString("archive"), "/path/to/archive"),
+        FunctionResource(FunctionResourceType.fromString("file"), "/path/to/file")),
+      isTemp = false, ifNotExists = true, replace = false)
     comparePlans(parsed1, expected1)
     comparePlans(parsed2, expected2)
+    comparePlans(parsed3, expected3)
+    comparePlans(parsed4, expected4)
+    comparePlans(parsed5, expected5)
   }
 
   test("drop function") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index e4dd077715d0f666656c4c681c4c46ed10c2360c..5c40d8bb4b1efeba6376672cabc68ecb623a43df 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -2270,6 +2270,57 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     }
   }
 
+  test("create temporary function with if not exists") {
+    withUserDefinedFunction("func1" -> true) {
+      val sql1 =
+        """
+          |CREATE TEMPORARY FUNCTION IF NOT EXISTS func1 as
+          |'com.matthewrathbone.example.SimpleUDFExample' USING JAR '/path/to/jar1',
+          |JAR '/path/to/jar2'
+        """.stripMargin
+      val e = intercept[AnalysisException] {
+        sql(sql1)
+      }.getMessage
+      assert(e.contains("It is not allowed to define a TEMPORARY function with IF NOT EXISTS"))
+    }
+  }
+
+  test("create function with both if not exists and replace") {
+    withUserDefinedFunction("func1" -> false) {
+      val sql1 =
+        """
+          |CREATE OR REPLACE FUNCTION IF NOT EXISTS func1 as
+          |'com.matthewrathbone.example.SimpleUDFExample' USING JAR '/path/to/jar1',
+          |JAR '/path/to/jar2'
+        """.stripMargin
+      val e = intercept[AnalysisException] {
+        sql(sql1)
+      }.getMessage
+      assert(e.contains("CREATE FUNCTION with both IF NOT EXISTS and REPLACE is not allowed"))
+    }
+  }
+
+  test("create temporary function by specifying a database") {
+    val dbName = "mydb"
+    withDatabase(dbName) {
+      sql(s"CREATE DATABASE $dbName")
+      sql(s"USE $dbName")
+      withUserDefinedFunction("func1" -> true) {
+        val sql1 =
+          s"""
+             |CREATE TEMPORARY FUNCTION $dbName.func1 as
+             |'com.matthewrathbone.example.SimpleUDFExample' USING JAR '/path/to/jar1',
+             |JAR '/path/to/jar2'
+            """.stripMargin
+        val e = intercept[AnalysisException] {
+          sql(sql1)
+        }.getMessage
+        assert(e.contains(s"Specifying a database in CREATE TEMPORARY FUNCTION " +
+          s"is not allowed: '$dbName'"))
+      }
+    }
+  }
+
   Seq(true, false).foreach { caseSensitive =>
     test(s"alter table add columns with existing column name - caseSensitive $caseSensitive") {
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 2a17849fa8a34a15988f203e7ee1660ca3d88943..306b38048e3a5fe27f5d43302c2fb4c13d2e0866 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -1132,6 +1132,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     client.dropFunction(db, name)
   }
 
+  override protected def doAlterFunction(
+      db: String, funcDefinition: CatalogFunction): Unit = withClient {
+    requireDbExists(db)
+    val functionName = funcDefinition.identifier.funcName.toLowerCase(Locale.ROOT)
+    requireFunctionExists(db, functionName)
+    val functionIdentifier = funcDefinition.identifier.copy(funcName = functionName)
+    client.alterFunction(db, funcDefinition.copy(identifier = functionIdentifier))
+  }
+
   override protected def doRenameFunction(
       db: String,
       oldName: String,