diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index bbf465aca8d4da8c50c13eae3c2f6d30d0086e44..177fc196e08349705cf450bf1eb8544ca45424ab 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -18,6 +18,7 @@
 """
 A collections of builtin functions
 """
+import math
 import sys
 
 if sys.version < "3":
@@ -143,7 +144,7 @@ _binary_mathfunctions = {
     'atan2': 'Returns the angle theta from the conversion of rectangular coordinates (x, y) to' +
              'polar coordinates (r, theta).',
     'hypot': 'Computes `sqrt(a^2^ + b^2^)` without intermediate overflow or underflow.',
-    'pow': 'Returns the value of the first argument raised to the power of the second argument.'
+    'pow': 'Returns the value of the first argument raised to the power of the second argument.',
 }
 
 _window_functions = {
@@ -403,6 +404,21 @@ def when(condition, value):
     return Column(jc)
 
 
+@since(1.4)
+def log(col, base=math.e):
+    """Returns the first argument-based logarithm of the second argument.
+
+    >>> df.select(log(df.age, 10.0).alias('ten')).map(lambda l: str(l.ten)[:7]).collect()
+    ['0.30102', '0.69897']
+
+    >>> df.select(log(df.age).alias('e')).map(lambda l: str(l.e)[:7]).collect()
+    ['0.69314', '1.60943']
+    """
+    sc = SparkContext._active_spark_context
+    jc = sc._jvm.functions.log(base, _to_java_column(col))
+    return Column(jc)
+
+
 @since(1.4)
 def lag(col, count=1, default=None):
     """
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 97b123ec2f6d9147ae84b754554deff306eb3d66..13b2bb05f528021b8928ee56b3b99c31e45ec56b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -112,6 +112,7 @@ object FunctionRegistry {
     expression[Expm1]("expm1"),
     expression[Floor]("floor"),
     expression[Hypot]("hypot"),
+    expression[Logarithm]("log"),
     expression[Log]("ln"),
     expression[Log10]("log10"),
     expression[Log1p]("log1p"),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
index 42c596b5b31ab89a0335d1757cbc4ca27be36bf2..67cb0b508ca9efc5f32a25cfd4ff4aa4d3c936f4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
@@ -255,3 +255,23 @@ case class Pow(left: Expression, right: Expression)
       """
   }
 }
+
+case class Logarithm(left: Expression, right: Expression)
+  extends BinaryMathExpression((c1, c2) => math.log(c2) / math.log(c1), "LOG") {
+  def this(child: Expression) = {
+    this(EulerNumber(), child)
+  }
+
+  override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+    val logCode = if (left.isInstanceOf[EulerNumber]) {
+      defineCodeGen(ctx, ev, (c1, c2) => s"java.lang.Math.log($c2)")
+    } else {
+      defineCodeGen(ctx, ev, (c1, c2) => s"java.lang.Math.log($c2) / java.lang.Math.log($c1)")
+    }
+    logCode + s"""
+      if (Double.valueOf(${ev.primitive}).isNaN()) {
+        ${ev.isNull} = true;
+      }
+    """
+  }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
index 864c954ee82cb752dde615998648357660decda5..0050ad3fe8302debb7651eed4099bef6c32da88e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
@@ -204,4 +204,22 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     testBinary(Atan2, math.atan2)
   }
 
+  test("binary log") {
+    val f = (c1: Double, c2: Double) => math.log(c2) / math.log(c1)
+    val domain = (1 to 20).map(v => (v * 0.1, v * 0.2))
+
+    domain.foreach { case (v1, v2) =>
+      checkEvaluation(Logarithm(Literal(v1), Literal(v2)), f(v1 + 0.0, v2 + 0.0), EmptyRow)
+      checkEvaluation(Logarithm(Literal(v2), Literal(v1)), f(v2 + 0.0, v1 + 0.0), EmptyRow)
+      checkEvaluation(new Logarithm(Literal(v1)), f(math.E, v1 + 0.0), EmptyRow)
+    }
+    checkEvaluation(
+      Logarithm(Literal.create(null, DoubleType), Literal(1.0)),
+      null,
+      create_row(null))
+    checkEvaluation(
+      Logarithm(Literal(1.0), Literal.create(null, DoubleType)),
+      null,
+      create_row(null))
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index c5b77724aae17336698ccbc0b3e603a8968936a7..dff0932c450a84d2b1c5b4084fa38f26c93a269c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -1083,6 +1083,22 @@ object functions {
    */
   def log(columnName: String): Column = log(Column(columnName))
 
+  /**
+   * Returns the first argument-base logarithm of the second argument.
+   *
+   * @group math_funcs
+   * @since 1.4.0
+   */
+  def log(base: Double, a: Column): Column = Logarithm(lit(base).expr, a.expr)
+
+  /**
+   * Returns the first argument-base logarithm of the second argument.
+   *
+   * @group math_funcs
+   * @since 1.4.0
+   */
+  def log(base: Double, columnName: String): Column = log(base, Column(columnName))
+
   /**
    * Computes the logarithm of the given value in base 10.
    *
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MathExpressionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MathExpressionsSuite.scala
index e2daaf6b730c5a326ca9a6563289458c2ab52d06..7c9c121b956bbd5bafbaedfd1e11c77d8fae1d87 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/MathExpressionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/MathExpressionsSuite.scala
@@ -236,6 +236,19 @@ class MathExpressionsSuite extends QueryTest {
     testOneToOneNonNegativeMathFunction(log1p, math.log1p)
   }
 
+  test("binary log") {
+    val df = Seq[(Integer, Integer)]((123, null)).toDF("a", "b")
+    checkAnswer(
+      df.select(org.apache.spark.sql.functions.log("a"),
+        org.apache.spark.sql.functions.log(2.0, "a"),
+        org.apache.spark.sql.functions.log("b")),
+      Row(math.log(123), math.log(123) / math.log(2), null))
+
+    checkAnswer(
+      df.selectExpr("log(a)", "log(2.0, a)", "log(b)"),
+      Row(math.log(123), math.log(123) / math.log(2), null))
+  }
+
   test("abs") {
     val input =
       Seq[(java.lang.Double, java.lang.Double)]((null, null), (0.0, 0.0), (1.5, 1.5), (-2.5, 2.5))