diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 39875d7f216b2ddfe01a30da2e375821c15b70b5..a7816e327526f63ae91e045555a98e4144a0c01e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -111,6 +111,7 @@ object FunctionRegistry {
     expression[Log10]("log10"),
     expression[Log1p]("log1p"),
     expression[Pi]("pi"),
+    expression[Log2]("log2"),
     expression[Pow]("pow"),
     expression[Rint]("rint"),
     expression[Signum]("signum"),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
index e1d8c9a0cdb5a01244f831a7ce18b055706f5d1b..97e960b8d6422a7db5f84cb321e99a14f8a2e165 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
@@ -161,6 +161,23 @@ case class Floor(child: Expression) extends UnaryMathExpression(math.floor, "FLO
 
 case class Log(child: Expression) extends UnaryMathExpression(math.log, "LOG")
 
+case class Log2(child: Expression)
+  extends UnaryMathExpression((x: Double) => math.log(x) / math.log(2), "LOG2") {
+  override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+    val eval = child.gen(ctx)
+    eval.code + s"""
+      boolean ${ev.isNull} = ${eval.isNull};
+      ${ctx.javaType(dataType)} ${ev.primitive} = ${ctx.defaultValue(dataType)};
+      if (!${ev.isNull}) {
+        ${ev.primitive} = java.lang.Math.log(${eval.primitive}) / java.lang.Math.log(2);
+        if (Double.valueOf(${ev.primitive}).isNaN()) {
+          ${ev.isNull} = true;
+        }
+      }
+    """
+  }
+}
+
 case class Log10(child: Expression) extends UnaryMathExpression(math.log10, "LOG10")
 
 case class Log1p(child: Expression) extends UnaryMathExpression(math.log1p, "LOG1P")
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
index 1fe69059d39dac744b6cd0cc6c11bd9076eea644..864c954ee82cb752dde615998648357660decda5 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
@@ -185,6 +185,12 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     testUnary(Log1p, math.log1p, (-10 to -2).map(_ * 1.0), expectNull = true)
   }
 
+  test("log2") {
+    def f: (Double) => Double = (x: Double) => math.log(x) / math.log(2)
+    testUnary(Log2, f, (0 to 20).map(_ * 0.1))
+    testUnary(Log2, f, (-5 to -1).map(_ * 1.0), expectNull = true)
+  }
+
   test("pow") {
     testBinary(Pow, math.pow, (-5 to 5).map(v => (v * 1.0, v * 1.0)))
     testBinary(Pow, math.pow, Seq((-1.0, 0.9), (-2.2, 1.7), (-2.2, -1.7)), expectNull = true)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 083f6b6bceee8a011363c52516431c973a639ebf..c5b77724aae17336698ccbc0b3e603a8968936a7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -1084,7 +1084,7 @@ object functions {
   def log(columnName: String): Column = log(Column(columnName))
 
   /**
-   * Computes the logarithm of the given value in Base 10.
+   * Computes the logarithm of the given value in base 10.
    *
    * @group math_funcs
    * @since 1.4.0
@@ -1092,7 +1092,7 @@ object functions {
   def log10(e: Column): Column = Log10(e.expr)
 
   /**
-   * Computes the logarithm of the given value in Base 10.
+   * Computes the logarithm of the given value in base 10.
    *
    * @group math_funcs
    * @since 1.4.0
@@ -1124,6 +1124,22 @@ object functions {
    */
   def pi(): Column = Pi()
 
+  /**
+   * Computes the logarithm of the given column in base 2.
+   *
+   * @group math_funcs
+   * @since 1.5.0
+   */
+  def log2(expr: Column): Column = Log2(expr.expr)
+
+  /**
+   * Computes the logarithm of the given value in base 2.
+   *
+   * @group math_funcs
+   * @since 1.5.0
+   */
+  def log2(columnName: String): Column = log2(Column(columnName))
+
   /**
    * Returns the value of the first argument raised to the power of the second argument.
    *
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index 171a2151e67aef07d4dd245eace80dea6ff31b28..659b64c185f43be0bfefda94cd04e8640216fc60 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -128,5 +128,17 @@ class DataFrameFunctionsSuite extends QueryTest {
       })
   }
 
+  test("log2 functions test") {
+    val df = Seq((1, 2)).toDF("a", "b")
+    checkAnswer(
+      df.select(log2("b") + log2("a")),
+      Row(1))
 
+    checkAnswer(
+      ctx.sql("SELECT LOG2(8)"),
+      Row(3))
+    checkAnswer(
+      ctx.sql("SELECT LOG2(null)"),
+      Row(null))
+  }
 }