diff --git a/core/src/main/scala/org/apache/spark/util/Benchmark.scala b/core/src/main/scala/org/apache/spark/util/Benchmark.scala
new file mode 100644
index 0000000000000000000000000000000000000000..457a1a05a1bf581e4c01e54a9893a0bb4c7f628f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/Benchmark.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import scala.collection.mutable
+
+import org.apache.commons.lang3.SystemUtils
+
+/**
+ * Utility class to benchmark components. An example of how to use this is:
+ *  val benchmark = new Benchmark("My Benchmark", valuesPerIteration)
+ *   benchmark.addCase("V1")(<function>)
+ *   benchmark.addCase("V2")(<function>)
+ *   benchmark.run
+ * This will output the average time to run each function and the rate of each function.
+ *
+ * The benchmark function takes one argument that is the iteration that's being run.
+ *
+ * If outputPerIteration is true, the timing for each run will be printed to stdout.
+ */
+private[spark] class Benchmark(
+    name: String, valuesPerIteration: Long,
+    iters: Int = 5,
+    outputPerIteration: Boolean = false) {
+  val benchmarks = mutable.ArrayBuffer.empty[Benchmark.Case]
+
+  def addCase(name: String)(f: Int => Unit): Unit = {
+    benchmarks += Benchmark.Case(name, f)
+  }
+
+  /**
+   * Runs the benchmark and outputs the results to stdout. This should be copied and added as
+   * a comment with the benchmark. Although the results vary from machine to machine, it should
+   * provide some baseline.
+   */
+  def run(): Unit = {
+    require(benchmarks.nonEmpty)
+    // scalastyle:off
+    println("Running benchmark: " + name)
+
+    val results = benchmarks.map { c =>
+      println("  Running case: " + c.name)
+      Benchmark.measure(valuesPerIteration, iters, outputPerIteration)(c.fn)
+    }
+    println
+
+    val firstRate = results.head.avgRate
+    // The results are going to be processor specific so it is useful to include that.
+    println(Benchmark.getProcessorName())
+    printf("%-24s %16s %16s %14s\n", name + ":", "Avg Time(ms)", "Avg Rate(M/s)", "Relative Rate")
+    println("-------------------------------------------------------------------------")
+    results.zip(benchmarks).foreach { r =>
+      printf("%-24s %16s %16s %14s\n",
+        r._2.name,
+        "%10.2f" format r._1.avgMs,
+        "%10.2f" format r._1.avgRate,
+        "%6.2f X" format (r._1.avgRate / firstRate))
+    }
+    println
+    // scalastyle:on
+  }
+}
+
+private[spark] object Benchmark {
+  case class Case(name: String, fn: Int => Unit)
+  case class Result(avgMs: Double, avgRate: Double)
+
+  /**
+   * This should return a user helpful processor information. Getting at this depends on the OS.
+   * This should return something like "Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz"
+   */
+  def getProcessorName(): String = {
+    if (SystemUtils.IS_OS_MAC_OSX) {
+      Utils.executeAndGetOutput(Seq("/usr/sbin/sysctl", "-n", "machdep.cpu.brand_string"))
+    } else if (SystemUtils.IS_OS_LINUX) {
+      Utils.executeAndGetOutput(Seq("/usr/bin/grep", "-m", "1", "\"model name\"", "/proc/cpuinfo"))
+    } else {
+      System.getenv("PROCESSOR_IDENTIFIER")
+    }
+  }
+
+  /**
+   * Runs a single function `f` for iters, returning the average time the function took and
+   * the rate of the function.
+   */
+  def measure(num: Long, iters: Int, outputPerIteration: Boolean)(f: Int => Unit): Result = {
+    var totalTime = 0L
+    for (i <- 0 until iters + 1) {
+      val start = System.nanoTime()
+
+      f(i)
+
+      val end = System.nanoTime()
+      if (i != 0) totalTime += end - start
+
+      if (outputPerIteration) {
+        // scalastyle:off
+        println(s"Iteration $i took ${(end - start) / 1000} microseconds")
+        // scalastyle:on
+      }
+    }
+    Result(totalTime.toDouble / 1000000 / iters, num * iters / (totalTime.toDouble / 1000))
+  }
+}
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
new file mode 100644
index 0000000000000000000000000000000000000000..cab6abde6da2308317615428de9110bff32f05a2
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.spark.sql.{SQLConf, SQLContext}
+import org.apache.spark.util.{Benchmark, Utils}
+import org.apache.spark.{SparkConf, SparkContext}
+
+/**
+ * Benchmark to measure parquet read performance.
+ * To run this:
+ *  spark-submit --class <this class> --jars <spark sql test jar>
+ */
+object ParquetReadBenchmark {
+  val conf = new SparkConf()
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+  val sc = new SparkContext("local[1]", "test-sql-context", conf)
+  val sqlContext = new SQLContext(sc)
+
+  def withTempPath(f: File => Unit): Unit = {
+    val path = Utils.createTempDir()
+    path.delete()
+    try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+    try f finally tableNames.foreach(sqlContext.dropTempTable)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+    val (keys, values) = pairs.unzip
+    val currentValues = keys.map(key => Try(sqlContext.conf.getConfString(key)).toOption)
+    (keys, values).zipped.foreach(sqlContext.conf.setConfString)
+    try f finally {
+      keys.zip(currentValues).foreach {
+        case (key, Some(value)) => sqlContext.conf.setConfString(key, value)
+        case (key, None) => sqlContext.conf.unsetConf(key)
+      }
+    }
+  }
+
+  def intScanBenchmark(values: Int): Unit = {
+    withTempPath { dir =>
+      sqlContext.range(values).write.parquet(dir.getCanonicalPath)
+      withTempTable("tempTable") {
+        sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("tempTable")
+        val benchmark = new Benchmark("Single Int Column Scan", values)
+
+        benchmark.addCase("SQL Parquet Reader") { iter =>
+          sqlContext.sql("select sum(id) from tempTable").collect()
+        }
+
+        benchmark.addCase("SQL Parquet MR") { iter =>
+          withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
+            sqlContext.sql("select sum(id) from tempTable").collect()
+          }
+        }
+
+        val files = SpecificParquetRecordReaderBase.listDirectory(dir).toArray
+        benchmark.addCase("ParquetReader") { num =>
+          var sum = 0L
+          files.map(_.asInstanceOf[String]).foreach { p =>
+            val reader = new UnsafeRowParquetRecordReader
+            reader.initialize(p, ("id" :: Nil).asJava)
+
+            while (reader.nextKeyValue()) {
+              val record = reader.getCurrentValue
+              if (!record.isNullAt(0)) sum += record.getInt(0)
+            }
+            reader.close()
+        }}
+
+        /*
+          Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
+          Single Int Column Scan:      Avg Time(ms)    Avg Rate(M/s)  Relative Rate
+          -------------------------------------------------------------------------
+          SQL Parquet Reader                 1910.0            13.72         1.00 X
+          SQL Parquet MR                     2330.0            11.25         0.82 X
+          ParquetReader                      1252.6            20.93         1.52 X
+        */
+        benchmark.run()
+      }
+    }
+  }
+
+  def intStringScanBenchmark(values: Int): Unit = {
+    withTempPath { dir =>
+      withTempTable("t1", "tempTable") {
+        sqlContext.range(values).registerTempTable("t1")
+        sqlContext.sql("select id as c1, cast(id as STRING) as c2 from t1")
+            .write.parquet(dir.getCanonicalPath)
+        sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("tempTable")
+
+        val benchmark = new Benchmark("Int and String Scan", values)
+
+        benchmark.addCase("SQL Parquet Reader") { iter =>
+          sqlContext.sql("select sum(c1), sum(length(c2)) from tempTable").collect
+        }
+
+        benchmark.addCase("SQL Parquet MR") { iter =>
+          withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
+            sqlContext.sql("select sum(c1), sum(length(c2)) from tempTable").collect
+          }
+        }
+
+        val files = SpecificParquetRecordReaderBase.listDirectory(dir).toArray
+        benchmark.addCase("ParquetReader") { num =>
+          var sum1 = 0L
+          var sum2 = 0L
+          files.map(_.asInstanceOf[String]).foreach { p =>
+            val reader = new UnsafeRowParquetRecordReader
+            reader.initialize(p, null)
+            while (reader.nextKeyValue()) {
+              val record = reader.getCurrentValue
+              if (!record.isNullAt(0)) sum1 += record.getInt(0)
+              if (!record.isNullAt(1)) sum2 += record.getUTF8String(1).numBytes()
+            }
+            reader.close()
+          }
+        }
+
+        /*
+          Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
+          Int and String Scan:         Avg Time(ms)    Avg Rate(M/s)  Relative Rate
+          -------------------------------------------------------------------------
+          SQL Parquet Reader                 2245.6             7.00         1.00 X
+          SQL Parquet MR                     2914.2             5.40         0.77 X
+          ParquetReader                      1544.6            10.18         1.45 X
+        */
+        benchmark.run()
+      }
+    }
+  }
+
+  def main(args: Array[String]): Unit = {
+    intScanBenchmark(1024 * 1024 * 15)
+    intStringScanBenchmark(1024 * 1024 * 10)
+  }
+}