diff --git a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
index 17d7b39c2d951f165db74da8c33f5e1e0fbe11c8..6e2375477a688040d6c1e137ad2e93cfa932c502 100644
--- a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
@@ -159,9 +159,9 @@ private[ui] trait PagedTable[T] {
       // "goButtonJsFuncName"
       val formJs =
         s"""$$(function(){
-          |  $$( "#form-task-page" ).submit(function(event) {
-          |    var page = $$("#form-task-page-no").val()
-          |    var pageSize = $$("#form-task-page-size").val()
+          |  $$( "#form-$tableId-page" ).submit(function(event) {
+          |    var page = $$("#form-$tableId-page-no").val()
+          |    var pageSize = $$("#form-$tableId-page-size").val()
           |    pageSize = pageSize ? pageSize: 100;
           |    if (page != "") {
           |      ${goButtonJsFuncName}(page, pageSize);
@@ -173,12 +173,14 @@ private[ui] trait PagedTable[T] {
 
       <div>
         <div>
-          <form id="form-task-page" class="form-inline pull-right" style="margin-bottom: 0px;">
+          <form id={s"form-$tableId-page"}
+                class="form-inline pull-right" style="margin-bottom: 0px;">
             <label>{totalPages} Pages. Jump to</label>
-            <input type="text" id="form-task-page-no" value={page.toString} class="span1" />
+            <input type="text" id={s"form-$tableId-page-no"} value={page.toString} class="span1" />
             <label>. Show </label>
-            <input type="text" id="form-task-page-size" value={pageSize.toString} class="span1" />
-            <label>tasks in a page.</label>
+            <input type="text"
+                   id={s"form-$tableId-page-size"} value={pageSize.toString} class="span1" />
+            <label>items in a page.</label>
             <button type="submit" class="btn">Go</button>
           </form>
         </div>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 3954c3d1ef8942b8613041619f186d0a6ce5f17a..0c94204df6530b5e66ec0c9f339b34c95a674a5b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -988,8 +988,7 @@ private[ui] class TaskDataSource(
       shuffleRead,
       shuffleWrite,
       bytesSpilled,
-      errorMessage.getOrElse("")
-    )
+      errorMessage.getOrElse(""))
   }
 
   /**
@@ -1197,7 +1196,7 @@ private[ui] class TaskPagedTable(
   private val displayPeakExecutionMemory =
     conf.getOption("spark.sql.unsafe.enabled").exists(_.toBoolean)
 
-  override def tableId: String = ""
+  override def tableId: String = "task-table"
 
   override def tableCssClass: String = "table table-bordered table-condensed table-striped"
 
@@ -1212,8 +1211,7 @@ private[ui] class TaskPagedTable(
     currentTime,
     pageSize,
     sortColumn,
-    desc
-  )
+    desc)
 
   override def pageLink(page: Int): String = {
     val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
@@ -1277,7 +1275,7 @@ private[ui] class TaskPagedTable(
         Seq(("Errors", ""))
 
     if (!taskHeadersAndCssClasses.map(_._1).contains(sortColumn)) {
-      new IllegalArgumentException(s"Unknown column: $sortColumn")
+      throw new IllegalArgumentException(s"Unknown column: $sortColumn")
     }
 
     val headerRow: Seq[Node] = {
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 36943978ff5946d5eb018007d7cda790ed647b8c..fd6cc3ed759b38e307e547cc5ba671434abb71da 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.ui.storage
 
+import java.net.URLEncoder
 import javax.servlet.http.HttpServletRequest
 
-import scala.xml.Node
+import scala.xml.{Node, Unparsed}
 
 import org.apache.spark.status.api.v1.{AllRDDResource, RDDDataDistribution, RDDPartitionInfo}
-import org.apache.spark.ui.{UIUtils, WebUIPage}
+import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils, WebUIPage}
 import org.apache.spark.util.Utils
 
 /** Page showing storage details for a given RDD */
@@ -32,6 +33,17 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
   def render(request: HttpServletRequest): Seq[Node] = {
     val parameterId = request.getParameter("id")
     require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
+
+    val parameterBlockPage = request.getParameter("block.page")
+    val parameterBlockSortColumn = request.getParameter("block.sort")
+    val parameterBlockSortDesc = request.getParameter("block.desc")
+    val parameterBlockPageSize = request.getParameter("block.pageSize")
+
+    val blockPage = Option(parameterBlockPage).map(_.toInt).getOrElse(1)
+    val blockSortColumn = Option(parameterBlockSortColumn).getOrElse("Block Name")
+    val blockSortDesc = Option(parameterBlockSortDesc).map(_.toBoolean).getOrElse(false)
+    val blockPageSize = Option(parameterBlockPageSize).map(_.toInt).getOrElse(100)
+
     val rddId = parameterId.toInt
     val rddStorageInfo = AllRDDResource.getRDDStorageInfo(rddId, listener, includeDetails = true)
       .getOrElse {
@@ -44,8 +56,34 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
       rddStorageInfo.dataDistribution.get, id = Some("rdd-storage-by-worker-table"))
 
     // Block table
-    val blockTable = UIUtils.listingTable(blockHeader, blockRow, rddStorageInfo.partitions.get,
-      id = Some("rdd-storage-by-block-table"))
+    val (blockTable, blockTableHTML) = try {
+      val _blockTable = new BlockPagedTable(
+        UIUtils.prependBaseUri(parent.basePath) + s"/storage/rdd/?id=${rddId}",
+        rddStorageInfo.partitions.get,
+        blockPageSize,
+        blockSortColumn,
+        blockSortDesc)
+      (_blockTable, _blockTable.table(blockPage))
+    } catch {
+      case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) =>
+        (null, <div class="alert alert-error">{e.getMessage}</div>)
+    }
+
+    val jsForScrollingDownToBlockTable =
+      <script>
+        {
+          Unparsed {
+            """
+              |$(function() {
+              |  if (/.*&block.sort=.*$/.test(location.search)) {
+              |    var topOffset = $("#blocks-section").offset().top;
+              |    $("html,body").animate({scrollTop: topOffset}, 200);
+              |  }
+              |});
+            """.stripMargin
+          }
+        }
+      </script>
 
     val content =
       <div class="row-fluid">
@@ -85,11 +123,11 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
         </div>
       </div>
 
-      <div class="row-fluid">
-        <div class="span12">
-          <h4> {rddStorageInfo.partitions.map(_.size).getOrElse(0)} Partitions </h4>
-          {blockTable}
-        </div>
+      <div>
+        <h4 id="blocks-section">
+          {rddStorageInfo.partitions.map(_.size).getOrElse(0)} Partitions
+        </h4>
+        {blockTableHTML ++ jsForScrollingDownToBlockTable}
       </div>;
 
     UIUtils.headerSparkPage("RDD Storage Info for " + rddStorageInfo.name, content, parent)
@@ -101,14 +139,6 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
     "Memory Usage",
     "Disk Usage")
 
-  /** Header fields for the block table */
-  private def blockHeader = Seq(
-    "Block Name",
-    "Storage Level",
-    "Size in Memory",
-    "Size on Disk",
-    "Executors")
-
   /** Render an HTML row representing a worker */
   private def workerRow(worker: RDDDataDistribution): Seq[Node] = {
     <tr>
@@ -120,23 +150,157 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
       <td>{Utils.bytesToString(worker.diskUsed)}</td>
     </tr>
   }
+}
+
+private[ui] case class BlockTableRowData(
+    blockName: String,
+    storageLevel: String,
+    memoryUsed: Long,
+    diskUsed: Long,
+    executors: String)
+
+private[ui] class BlockDataSource(
+    rddPartitions: Seq[RDDPartitionInfo],
+    pageSize: Int,
+    sortColumn: String,
+    desc: Boolean) extends PagedDataSource[BlockTableRowData](pageSize) {
+
+  private val data = rddPartitions.map(blockRow).sorted(ordering(sortColumn, desc))
+
+  override def dataSize: Int = data.size
+
+  override def sliceData(from: Int, to: Int): Seq[BlockTableRowData] = {
+    data.slice(from, to)
+  }
+
+  private def blockRow(rddPartition: RDDPartitionInfo): BlockTableRowData = {
+    BlockTableRowData(
+      rddPartition.blockName,
+      rddPartition.storageLevel,
+      rddPartition.memoryUsed,
+      rddPartition.diskUsed,
+      rddPartition.executors.mkString(" "))
+  }
+
+  /**
+   * Return Ordering according to sortColumn and desc
+   */
+  private def ordering(sortColumn: String, desc: Boolean): Ordering[BlockTableRowData] = {
+    val ordering = sortColumn match {
+      case "Block Name" => new Ordering[BlockTableRowData] {
+        override def compare(x: BlockTableRowData, y: BlockTableRowData): Int =
+          Ordering.String.compare(x.blockName, y.blockName)
+      }
+      case "Storage Level" => new Ordering[BlockTableRowData] {
+        override def compare(x: BlockTableRowData, y: BlockTableRowData): Int =
+          Ordering.String.compare(x.storageLevel, y.storageLevel)
+      }
+      case "Size in Memory" => new Ordering[BlockTableRowData] {
+        override def compare(x: BlockTableRowData, y: BlockTableRowData): Int =
+          Ordering.Long.compare(x.memoryUsed, y.memoryUsed)
+      }
+      case "Size on Disk" => new Ordering[BlockTableRowData] {
+        override def compare(x: BlockTableRowData, y: BlockTableRowData): Int =
+          Ordering.Long.compare(x.diskUsed, y.diskUsed)
+      }
+      case "Executors" => new Ordering[BlockTableRowData] {
+        override def compare(x: BlockTableRowData, y: BlockTableRowData): Int =
+          Ordering.String.compare(x.executors, y.executors)
+      }
+      case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
+    }
+    if (desc) {
+      ordering.reverse
+    } else {
+      ordering
+    }
+  }
+}
+
+private[ui] class BlockPagedTable(
+    basePath: String,
+    rddPartitions: Seq[RDDPartitionInfo],
+    pageSize: Int,
+    sortColumn: String,
+    desc: Boolean) extends PagedTable[BlockTableRowData] {
+
+  override def tableId: String = "rdd-storage-by-block-table"
+
+  override def tableCssClass: String = "table table-bordered table-condensed table-striped"
+
+  override val dataSource: BlockDataSource = new BlockDataSource(
+    rddPartitions,
+    pageSize,
+    sortColumn,
+    desc)
+
+  override def pageLink(page: Int): String = {
+    val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+    s"${basePath}&block.page=$page&block.sort=${encodedSortColumn}&block.desc=${desc}" +
+      s"&block.pageSize=${pageSize}"
+  }
+
+  override def goButtonJavascriptFunction: (String, String) = {
+    val jsFuncName = "goToBlockPage"
+    val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+    val jsFunc = s"""
+      |currentBlockPageSize = ${pageSize}
+      |function goToBlockPage(page, pageSize) {
+      |  // Set page to 1 if the page size changes
+      |  page = pageSize == currentBlockPageSize ? page : 1;
+      |  var url = "${basePath}&block.sort=${encodedSortColumn}&block.desc=${desc}" +
+      |    "&block.page=" + page + "&block.pageSize=" + pageSize;
+      |  window.location.href = url;
+      |}
+     """.stripMargin
+    (jsFuncName, jsFunc)
+  }
 
-  /** Render an HTML row representing a block */
-  private def blockRow(row: RDDPartitionInfo): Seq[Node] = {
+  override def headers: Seq[Node] = {
+    val blockHeaders = Seq(
+      "Block Name",
+      "Storage Level",
+      "Size in Memory",
+      "Size on Disk",
+      "Executors")
+
+    if (!blockHeaders.contains(sortColumn)) {
+      throw new IllegalArgumentException(s"Unknown column: $sortColumn")
+    }
+
+    val headerRow: Seq[Node] = {
+      blockHeaders.map { header =>
+        if (header == sortColumn) {
+          val headerLink =
+            s"$basePath&block.sort=${URLEncoder.encode(header, "UTF-8")}&block.desc=${!desc}" +
+              s"&block.pageSize=${pageSize}"
+          val js = Unparsed(s"window.location.href='${headerLink}'")
+          val arrow = if (desc) "&#x25BE;" else "&#x25B4;" // UP or DOWN
+          <th onclick={js} style="cursor: pointer;">
+            {header}
+            <span>&nbsp;{Unparsed(arrow)}</span>
+          </th>
+        } else {
+          val headerLink =
+            s"$basePath&block.sort=${URLEncoder.encode(header, "UTF-8")}" +
+              s"&block.pageSize=${pageSize}"
+          val js = Unparsed(s"window.location.href='${headerLink}'")
+          <th onclick={js} style="cursor: pointer;">
+            {header}
+          </th>
+        }
+      }
+    }
+    <thead>{headerRow}</thead>
+  }
+
+  override def row(block: BlockTableRowData): Seq[Node] = {
     <tr>
-      <td>{row.blockName}</td>
-      <td>
-        {row.storageLevel}
-      </td>
-      <td sorttable_customkey={row.memoryUsed.toString}>
-        {Utils.bytesToString(row.memoryUsed)}
-      </td>
-      <td sorttable_customkey={row.diskUsed.toString}>
-        {Utils.bytesToString(row.diskUsed)}
-      </td>
-      <td>
-        {row.executors.map(l => <span>{l}<br/></span>)}
-      </td>
+      <td>{block.blockName}</td>
+      <td>{block.storageLevel}</td>
+      <td>{Utils.bytesToString(block.memoryUsed)}</td>
+      <td>{Utils.bytesToString(block.diskUsed)}</td>
+      <td>{block.executors}</td>
     </tr>
   }
 }