diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
index db062f1a543fea46550b7a8145629aeac1c2902b..1ec2e4a9e9319f2a8f62ced8326936eea72a75df 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
@@ -245,7 +245,8 @@ object ApproximatePercentile {
         val result = new Array[Double](percentages.length)
         var i = 0
         while (i < percentages.length) {
-          result(i) = summaries.query(percentages(i))
+          // Since summaries.count != 0, the query here never return None.
+          result(i) = summaries.query(percentages(i)).get
           i += 1
         }
         result
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
index 04f4ff2a92247aa2a92ddea507ff0a76b4d11b8e..af543b04ba780d55fec14c146d1eb7c84037c858 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
@@ -176,17 +176,19 @@ class QuantileSummaries(
    * @param quantile the target quantile
    * @return
    */
-  def query(quantile: Double): Double = {
+  def query(quantile: Double): Option[Double] = {
     require(quantile >= 0 && quantile <= 1.0, "quantile should be in the range [0.0, 1.0]")
     require(headSampled.isEmpty,
       "Cannot operate on an uncompressed summary, call compress() first")
 
+    if (sampled.isEmpty) return None
+
     if (quantile <= relativeError) {
-      return sampled.head.value
+      return Some(sampled.head.value)
     }
 
     if (quantile >= 1 - relativeError) {
-      return sampled.last.value
+      return Some(sampled.last.value)
     }
 
     // Target rank
@@ -200,11 +202,11 @@ class QuantileSummaries(
       minRank += curSample.g
       val maxRank = minRank + curSample.delta
       if (maxRank - targetError <= rank && rank <= minRank + targetError) {
-        return curSample.value
+        return Some(curSample.value)
       }
       i += 1
     }
-    sampled.last.value
+    Some(sampled.last.value)
   }
 }
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
index 5e90970b1bb2eb617c48e929ceed77db2886a19b..df579d5ec1ddff604abd40ea28acee0cf0fa1908 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
@@ -55,15 +55,19 @@ class QuantileSummariesSuite extends SparkFunSuite {
   }
 
   private def checkQuantile(quant: Double, data: Seq[Double], summary: QuantileSummaries): Unit = {
-    val approx = summary.query(quant)
-    // The rank of the approximation.
-    val rank = data.count(_ < approx) // has to be <, not <= to be exact
-    val lower = math.floor((quant - summary.relativeError) * data.size)
-    val upper = math.ceil((quant + summary.relativeError) * data.size)
-    val msg =
-      s"$rank not in [$lower $upper], requested quantile: $quant, approx returned: $approx"
-    assert(rank >= lower, msg)
-    assert(rank <= upper, msg)
+    if (data.nonEmpty) {
+      val approx = summary.query(quant).get
+      // The rank of the approximation.
+      val rank = data.count(_ < approx) // has to be <, not <= to be exact
+      val lower = math.floor((quant - summary.relativeError) * data.size)
+      val upper = math.ceil((quant + summary.relativeError) * data.size)
+      val msg =
+        s"$rank not in [$lower $upper], requested quantile: $quant, approx returned: $approx"
+      assert(rank >= lower, msg)
+      assert(rank <= upper, msg)
+    } else {
+      assert(summary.query(quant).isEmpty)
+    }
   }
 
   for {
@@ -74,9 +78,9 @@ class QuantileSummariesSuite extends SparkFunSuite {
 
     test(s"Extremas with epsi=$epsi and seq=$seq_name, compression=$compression") {
       val s = buildSummary(data, epsi, compression)
-      val min_approx = s.query(0.0)
+      val min_approx = s.query(0.0).get
       assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx")
-      val max_approx = s.query(1.0)
+      val max_approx = s.query(1.0).get
       assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx")
     }
 
@@ -100,6 +104,18 @@ class QuantileSummariesSuite extends SparkFunSuite {
       checkQuantile(0.1, data, s)
       checkQuantile(0.001, data, s)
     }
+
+    test(s"Tests on empty data with epsi=$epsi and seq=$seq_name, compression=$compression") {
+      val emptyData = Seq.empty[Double]
+      val s = buildSummary(emptyData, epsi, compression)
+      assert(s.count == 0, s"Found count=${s.count} but data size=0")
+      assert(s.sampled.isEmpty, s"if QuantileSummaries is empty, sampled should be empty")
+      checkQuantile(0.9999, emptyData, s)
+      checkQuantile(0.9, emptyData, s)
+      checkQuantile(0.5, emptyData, s)
+      checkQuantile(0.1, emptyData, s)
+      checkQuantile(0.001, emptyData, s)
+    }
   }
 
   // Tests for merging procedure
@@ -118,9 +134,9 @@ class QuantileSummariesSuite extends SparkFunSuite {
       val s1 = buildSummary(data1, epsi, compression)
       val s2 = buildSummary(data2, epsi, compression)
       val s = s1.merge(s2)
-      val min_approx = s.query(0.0)
+      val min_approx = s.query(0.0).get
       assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx")
-      val max_approx = s.query(1.0)
+      val max_approx = s.query(1.0).get
       assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx")
       checkQuantile(0.9999, data, s)
       checkQuantile(0.9, data, s)
@@ -137,9 +153,9 @@ class QuantileSummariesSuite extends SparkFunSuite {
       val s1 = buildSummary(data11, epsi, compression)
       val s2 = buildSummary(data12, epsi, compression)
       val s = s1.merge(s2)
-      val min_approx = s.query(0.0)
+      val min_approx = s.query(0.0).get
       assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx")
-      val max_approx = s.query(1.0)
+      val max_approx = s.query(1.0).get
       assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx")
       checkQuantile(0.9999, data, s)
       checkQuantile(0.9, data, s)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
index bdcdf0c61ff36343211c83bf394fb28bca4e2281..c856d3099f6eefcec3fad1fbeaf2096b7e8d837a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
@@ -64,7 +64,7 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) {
    * @return the approximate quantiles at the given probabilities
    *
    * @note null and NaN values will be removed from the numerical column before calculation. If
-   *   the dataframe is empty or all rows contain null or NaN, null is returned.
+   *   the dataframe is empty or the column only contains null or NaN, an empty array is returned.
    *
    * @since 2.0.0
    */
@@ -72,8 +72,7 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) {
       col: String,
       probabilities: Array[Double],
       relativeError: Double): Array[Double] = {
-    val res = approxQuantile(Array(col), probabilities, relativeError)
-    Option(res).map(_.head).orNull
+    approxQuantile(Array(col), probabilities, relativeError).head
   }
 
   /**
@@ -89,8 +88,8 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) {
    *   Note that values greater than 1 are accepted but give the same result as 1.
    * @return the approximate quantiles at the given probabilities of each column
    *
-   * @note Rows containing any null or NaN values will be removed before calculation. If
-   *   the dataframe is empty or all rows contain null or NaN, null is returned.
+   * @note null and NaN values will be ignored in numerical columns before calculation. For
+   *   columns only containing null or NaN values, an empty array is returned.
    *
    * @since 2.2.0
    */
@@ -98,13 +97,11 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) {
       cols: Array[String],
       probabilities: Array[Double],
       relativeError: Double): Array[Array[Double]] = {
-    // TODO: Update NaN/null handling to keep consistent with the single-column version
-    try {
-      StatFunctions.multipleApproxQuantiles(df.select(cols.map(col): _*).na.drop(), cols,
-        probabilities, relativeError).map(_.toArray).toArray
-    } catch {
-      case e: NoSuchElementException => null
-    }
+    StatFunctions.multipleApproxQuantiles(
+      df.select(cols.map(col): _*),
+      cols,
+      probabilities,
+      relativeError).map(_.toArray).toArray
   }
 
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
index c3d8859cb7a92d41945dd6b2ef5920e24c6892f7..1debad03c93fa97562d152f0215be1e0a1c72931 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
@@ -54,6 +54,9 @@ object StatFunctions extends Logging {
    *   Note that values greater than 1 are accepted but give the same result as 1.
    *
    * @return for each column, returns the requested approximations
+   *
+   * @note null and NaN values will be ignored in numerical columns before calculation. For
+   *   a column only containing null or NaN values, an empty array is returned.
    */
   def multipleApproxQuantiles(
       df: DataFrame,
@@ -78,7 +81,10 @@ object StatFunctions extends Logging {
     def apply(summaries: Array[QuantileSummaries], row: Row): Array[QuantileSummaries] = {
       var i = 0
       while (i < summaries.length) {
-        summaries(i) = summaries(i).insert(row.getDouble(i))
+        if (!row.isNullAt(i)) {
+          val v = row.getDouble(i)
+          if (!v.isNaN) summaries(i) = summaries(i).insert(v)
+        }
         i += 1
       }
       summaries
@@ -91,7 +97,7 @@ object StatFunctions extends Logging {
     }
     val summaries = df.select(columns: _*).rdd.aggregate(emptySummaries)(apply, merge)
 
-    summaries.map { summary => probabilities.map(summary.query) }
+    summaries.map { summary => probabilities.flatMap(summary.query) }
   }
 
   /** Calculate the Pearson Correlation Coefficient for the given columns */
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
index d0910e618a040ae203589d81d1e2e3659c53d38b..97890a035a62fbc4b79338e34e0f208706c36e4e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
@@ -171,15 +171,6 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext {
       df.stat.approxQuantile(Array("singles", "doubles"), Array(q1, q2), -1.0)
     }
     assert(e2.getMessage.contains("Relative Error must be non-negative"))
-
-    // return null if the dataset is empty
-    val res1 = df.selectExpr("*").limit(0)
-      .stat.approxQuantile("singles", Array(q1, q2), epsilons.head)
-    assert(res1 === null)
-
-    val res2 = df.selectExpr("*").limit(0)
-      .stat.approxQuantile(Array("singles", "doubles"), Array(q1, q2), epsilons.head)
-    assert(res2 === null)
   }
 
   test("approximate quantile 2: test relativeError greater than 1 return the same result as 1") {
@@ -214,20 +205,48 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext {
     val q1 = 0.5
     val q2 = 0.8
     val epsilon = 0.1
-    val rows = spark.sparkContext.parallelize(Seq(Row(Double.NaN, 1.0), Row(1.0, 1.0),
-      Row(-1.0, Double.NaN), Row(Double.NaN, Double.NaN), Row(null, null), Row(null, 1.0),
-      Row(-1.0, null), Row(Double.NaN, null)))
+    val rows = spark.sparkContext.parallelize(Seq(Row(Double.NaN, 1.0, Double.NaN),
+      Row(1.0, -1.0, null), Row(-1.0, Double.NaN, null), Row(Double.NaN, Double.NaN, null),
+      Row(null, null, Double.NaN), Row(null, 1.0, null), Row(-1.0, null, Double.NaN),
+      Row(Double.NaN, null, null)))
     val schema = StructType(Seq(StructField("input1", DoubleType, nullable = true),
-      StructField("input2", DoubleType, nullable = true)))
+      StructField("input2", DoubleType, nullable = true),
+      StructField("input3", DoubleType, nullable = true)))
     val dfNaN = spark.createDataFrame(rows, schema)
-    val resNaN = dfNaN.stat.approxQuantile("input1", Array(q1, q2), epsilon)
-    assert(resNaN.count(_.isNaN) === 0)
-    assert(resNaN.count(_ == null) === 0)
 
-    val resNaN2 = dfNaN.stat.approxQuantile(Array("input1", "input2"),
+    val resNaN1 = dfNaN.stat.approxQuantile("input1", Array(q1, q2), epsilon)
+    assert(resNaN1.count(_.isNaN) === 0)
+    assert(resNaN1.count(_ == null) === 0)
+
+    val resNaN2 = dfNaN.stat.approxQuantile("input2", Array(q1, q2), epsilon)
+    assert(resNaN2.count(_.isNaN) === 0)
+    assert(resNaN2.count(_ == null) === 0)
+
+    val resNaN3 = dfNaN.stat.approxQuantile("input3", Array(q1, q2), epsilon)
+    assert(resNaN3.isEmpty)
+
+    val resNaNAll = dfNaN.stat.approxQuantile(Array("input1", "input2", "input3"),
       Array(q1, q2), epsilon)
-    assert(resNaN2.flatten.count(_.isNaN) === 0)
-    assert(resNaN2.flatten.count(_ == null) === 0)
+    assert(resNaNAll.flatten.count(_.isNaN) === 0)
+    assert(resNaNAll.flatten.count(_ == null) === 0)
+
+    assert(resNaN1(0) === resNaNAll(0)(0))
+    assert(resNaN1(1) === resNaNAll(0)(1))
+    assert(resNaN2(0) === resNaNAll(1)(0))
+    assert(resNaN2(1) === resNaNAll(1)(1))
+
+    // return empty array for columns only containing null or NaN values
+    assert(resNaNAll(2).isEmpty)
+
+    // return empty array if the dataset is empty
+    val res1 = dfNaN.selectExpr("*").limit(0)
+      .stat.approxQuantile("input1", Array(q1, q2), epsilon)
+    assert(res1.isEmpty)
+
+    val res2 = dfNaN.selectExpr("*").limit(0)
+      .stat.approxQuantile(Array("input1", "input2"), Array(q1, q2), epsilon)
+    assert(res2(0).isEmpty)
+    assert(res2(1).isEmpty)
   }
 
   test("crosstab") {