diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 5bb505bf09f17d51702b34f7d86095b590412873..dd149a919fe5569d2fcefc7bf18f2e009c496e42 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -225,6 +225,15 @@ class TaskMetrics private[spark] () extends Serializable {
   }
 
   private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums ++ externalAccums
+
+  /**
+   * Looks for a registered accumulator by accumulator name.
+   */
+  private[spark] def lookForAccumulatorByName(name: String): Option[AccumulatorV2[_, _]] = {
+    accumulators.find { acc =>
+      acc.name.isDefined && acc.name.get == name
+    }
+  }
 }
 
 
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index a9167ce6edf90f5935cd43957d32d8c1f1c0e4fd..d130a37db5b5d9315de765f3e1ab03a8265f7bff 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -23,6 +23,8 @@ import java.util.ArrayList
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicLong
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.{InternalAccumulator, SparkContext, TaskContext}
 import org.apache.spark.scheduler.AccumulableInfo
 
@@ -257,6 +259,16 @@ private[spark] object AccumulatorContext {
     originals.clear()
   }
 
+  /**
+   * Looks for a registered accumulator by accumulator name.
+   */
+  private[spark] def lookForAccumulatorByName(name: String): Option[AccumulatorV2[_, _]] = {
+    originals.values().asScala.find { ref =>
+      val acc = ref.get
+      acc != null && acc.name.isDefined && acc.name.get == name
+    }.map(_.get)
+  }
+
   // Identifier for distinguishing SQL metrics from other accumulators
   private[spark] val SQL_ACCUM_IDENTIFIER = "sql"
 }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index dfe696764796f17a08c16d9c643b8c6d63f3ad04..06cd9ea2d242c074b6dafc4712fb9f043f6d219f 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -31,6 +31,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import scala.Option;
+
 import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
 import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
 import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
@@ -59,8 +61,12 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.ConfigurationUtil;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Types;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.types.StructType$;
+import org.apache.spark.util.AccumulatorV2;
+import org.apache.spark.util.LongAccumulator;
 
 /**
  * Base class for custom RecordReaders for Parquet that directly materialize to `T`.
@@ -145,6 +151,18 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
     for (BlockMetaData block : blocks) {
       this.totalRowCount += block.getRowCount();
     }
+
+    // For test purpose.
+    // If the predefined accumulator exists, the row group number to read will be updated
+    // to the accumulator. So we can check if the row groups are filtered or not in test case.
+    TaskContext taskContext = TaskContext$.MODULE$.get();
+    if (taskContext != null) {
+      Option<AccumulatorV2<?, ?>> accu = (Option<AccumulatorV2<?, ?>>) taskContext.taskMetrics()
+        .lookForAccumulatorByName("numRowGroups");
+      if (accu.isDefined()) {
+        ((LongAccumulator)accu.get()).add((long)blocks.size());
+      }
+    }
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 612a295c0e3138ff6900a45dc30eaef28cf129e5..7794f31331a86be2b87ad5f3637f3f4d51a5896a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -46,6 +46,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
 import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
 import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
@@ -357,6 +358,11 @@ class ParquetFileFormat
       val hadoopAttemptContext =
         new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
 
+      // Try to push down filters when filter push-down is enabled.
+      // Notice: This push-down is RowGroups level, not individual records.
+      if (pushed.isDefined) {
+        ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
+      }
       val parquetReader = if (enableVectorizedReader) {
         val vectorizedReader = new VectorizedParquetRecordReader()
         vectorizedReader.initialize(split, hadoopAttemptContext)
@@ -563,87 +569,7 @@ private[parquet] class ParquetOutputWriter(
   override def close(): Unit = recordWriter.close(context)
 }
 
-
 object ParquetFileFormat extends Logging {
-  /**
-   * If parquet's block size (row group size) setting is larger than the min split size,
-   * we use parquet's block size setting as the min split size. Otherwise, we will create
-   * tasks processing nothing (because a split does not cover the starting point of a
-   * parquet block). See https://issues.apache.org/jira/browse/SPARK-10143 for more information.
-   */
-  private def overrideMinSplitSize(parquetBlockSize: Long, conf: Configuration): Unit = {
-    val minSplitSize =
-      math.max(
-        conf.getLong("mapred.min.split.size", 0L),
-        conf.getLong("mapreduce.input.fileinputformat.split.minsize", 0L))
-    if (parquetBlockSize > minSplitSize) {
-      val message =
-        s"Parquet's block size (row group size) is larger than " +
-          s"mapred.min.split.size/mapreduce.input.fileinputformat.split.minsize. Setting " +
-          s"mapred.min.split.size and mapreduce.input.fileinputformat.split.minsize to " +
-          s"$parquetBlockSize."
-      logDebug(message)
-      conf.set("mapred.min.split.size", parquetBlockSize.toString)
-      conf.set("mapreduce.input.fileinputformat.split.minsize", parquetBlockSize.toString)
-    }
-  }
-
-  /** This closure sets various Parquet configurations at both driver side and executor side. */
-  private[parquet] def initializeLocalJobFunc(
-      requiredColumns: Array[String],
-      filters: Array[Filter],
-      dataSchema: StructType,
-      parquetBlockSize: Long,
-      useMetadataCache: Boolean,
-      parquetFilterPushDown: Boolean,
-      assumeBinaryIsString: Boolean,
-      assumeInt96IsTimestamp: Boolean)(job: Job): Unit = {
-    val conf = job.getConfiguration
-    conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
-
-    // Try to push down filters when filter push-down is enabled.
-    if (parquetFilterPushDown) {
-      filters
-        // Collects all converted Parquet filter predicates. Notice that not all predicates can be
-        // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
-        // is used here.
-        .flatMap(ParquetFilters.createFilter(dataSchema, _))
-        .reduceOption(FilterApi.and)
-        .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
-    }
-
-    conf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
-      val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
-      ParquetSchemaConverter.checkFieldNames(requestedSchema).json
-    })
-
-    conf.set(
-      ParquetWriteSupport.SPARK_ROW_SCHEMA,
-      ParquetSchemaConverter.checkFieldNames(dataSchema).json)
-
-    // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
-    conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
-
-    // Sets flags for `CatalystSchemaConverter`
-    conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
-    conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp)
-
-    overrideMinSplitSize(parquetBlockSize, conf)
-  }
-
-  /** This closure sets input paths at the driver side. */
-  private[parquet] def initializeDriverSideJobFunc(
-      inputFiles: Array[FileStatus],
-      parquetBlockSize: Long)(job: Job): Unit = {
-    // We side the input paths at the driver side.
-    logInfo(s"Reading Parquet file(s) from ${inputFiles.map(_.getPath).mkString(", ")}")
-    if (inputFiles.nonEmpty) {
-      FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
-    }
-
-    overrideMinSplitSize(parquetBlockSize, job.getConfiguration)
-  }
-
   private[parquet] def readSchema(
       footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index d846b27ffed03dc6940c2cf7f1593f73220f9bed..4246b54c21f0c9c3c5902cff2e62649fb76108ac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
+import org.apache.spark.util.{AccumulatorContext, LongAccumulator}
 
 /**
  * A test suite that tests Parquet filter2 API based filter pushdown optimization.
@@ -368,73 +369,75 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
 
   test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") {
     import testImplicits._
-
-    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
-      SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
-      withTempPath { dir =>
-        val pathOne = s"${dir.getCanonicalPath}/table1"
-        (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
-        val pathTwo = s"${dir.getCanonicalPath}/table2"
-        (1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)
-
-        // If the "c = 1" filter gets pushed down, this query will throw an exception which
-        // Parquet emits. This is a Parquet issue (PARQUET-389).
-        val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a")
-        checkAnswer(
-          df,
-          Row(1, "1", null))
-
-        // The fields "a" and "c" only exist in one Parquet file.
-        assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-        assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-
-        val pathThree = s"${dir.getCanonicalPath}/table3"
-        df.write.parquet(pathThree)
-
-        // We will remove the temporary metadata when writing Parquet file.
-        val schema = spark.read.parquet(pathThree).schema
-        assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
-
-        val pathFour = s"${dir.getCanonicalPath}/table4"
-        val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
-        dfStruct.select(struct("a").as("s")).write.parquet(pathFour)
-
-        val pathFive = s"${dir.getCanonicalPath}/table5"
-        val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b")
-        dfStruct2.select(struct("c").as("s")).write.parquet(pathFive)
-
-        // If the "s.c = 1" filter gets pushed down, this query will throw an exception which
-        // Parquet emits.
-        val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1")
-          .selectExpr("s")
-        checkAnswer(dfStruct3, Row(Row(null, 1)))
-
-        // The fields "s.a" and "s.c" only exist in one Parquet file.
-        val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType]
-        assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-        assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-
-        val pathSix = s"${dir.getCanonicalPath}/table6"
-        dfStruct3.write.parquet(pathSix)
-
-        // We will remove the temporary metadata when writing Parquet file.
-        val forPathSix = spark.read.parquet(pathSix).schema
-        assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
-
-        // sanity test: make sure optional metadata field is not wrongly set.
-        val pathSeven = s"${dir.getCanonicalPath}/table7"
-        (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven)
-        val pathEight = s"${dir.getCanonicalPath}/table8"
-        (4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight)
-
-        val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b")
-        checkAnswer(
-          df2,
-          Row(1, "1"))
-
-        // The fields "a" and "b" exist in both two Parquet files. No metadata is set.
-        assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
-        assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
+    Seq("true", "false").map { vectorized =>
+      withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
+        SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
+        SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
+        withTempPath { dir =>
+          val pathOne = s"${dir.getCanonicalPath}/table1"
+          (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
+          val pathTwo = s"${dir.getCanonicalPath}/table2"
+          (1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)
+
+          // If the "c = 1" filter gets pushed down, this query will throw an exception which
+          // Parquet emits. This is a Parquet issue (PARQUET-389).
+          val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a")
+          checkAnswer(
+            df,
+            Row(1, "1", null))
+
+          // The fields "a" and "c" only exist in one Parquet file.
+          assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
+          assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
+
+          val pathThree = s"${dir.getCanonicalPath}/table3"
+          df.write.parquet(pathThree)
+
+          // We will remove the temporary metadata when writing Parquet file.
+          val schema = spark.read.parquet(pathThree).schema
+          assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
+
+          val pathFour = s"${dir.getCanonicalPath}/table4"
+          val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
+          dfStruct.select(struct("a").as("s")).write.parquet(pathFour)
+
+          val pathFive = s"${dir.getCanonicalPath}/table5"
+          val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b")
+          dfStruct2.select(struct("c").as("s")).write.parquet(pathFive)
+
+          // If the "s.c = 1" filter gets pushed down, this query will throw an exception which
+          // Parquet emits.
+          val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1")
+            .selectExpr("s")
+          checkAnswer(dfStruct3, Row(Row(null, 1)))
+
+          // The fields "s.a" and "s.c" only exist in one Parquet file.
+          val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType]
+          assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
+          assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
+
+          val pathSix = s"${dir.getCanonicalPath}/table6"
+          dfStruct3.write.parquet(pathSix)
+
+          // We will remove the temporary metadata when writing Parquet file.
+          val forPathSix = spark.read.parquet(pathSix).schema
+          assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
+
+          // sanity test: make sure optional metadata field is not wrongly set.
+          val pathSeven = s"${dir.getCanonicalPath}/table7"
+          (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven)
+          val pathEight = s"${dir.getCanonicalPath}/table8"
+          (4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight)
+
+          val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b")
+          checkAnswer(
+            df2,
+            Row(1, "1"))
+
+          // The fields "a" and "b" exist in both two Parquet files. No metadata is set.
+          assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
+          assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
+        }
       }
     }
   }
@@ -527,4 +530,32 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       assert(df.filter("_1 IS NOT NULL").count() === 4)
     }
   }
+
+  test("Fiters should be pushed down for vectorized Parquet reader at row group level") {
+    import testImplicits._
+
+    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
+        SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+      withTempPath { dir =>
+        val path = s"${dir.getCanonicalPath}/table"
+        (1 to 1024).map(i => (101, i)).toDF("a", "b").write.parquet(path)
+
+        Seq(("true", (x: Long) => x == 0), ("false", (x: Long) => x > 0)).map { case (push, func) =>
+          withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> push) {
+            val accu = new LongAccumulator
+            accu.register(sparkContext, Some("numRowGroups"))
+
+            val df = spark.read.parquet(path).filter("a < 100")
+            df.foreachPartition(_.foreach(v => accu.add(0)))
+            df.collect
+
+            val numRowGroups = AccumulatorContext.lookForAccumulatorByName("numRowGroups")
+            assert(numRowGroups.isDefined)
+            assert(func(numRowGroups.get.asInstanceOf[LongAccumulator].value))
+            AccumulatorContext.remove(accu.id)
+          }
+        }
+      }
+    }
+  }
 }