diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
index 4e83d6d564986fc916c1bb8c491e913a6102876b..5c91304e49fd701c0442af662b145c20a1d4c2bf 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
+++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
@@ -24,7 +24,15 @@ limitations under the License.
         <th></th>
         <th>RDD Blocks</th>
         <th><span data-toggle="tooltip"
-                  title="Memory used / total available memory for storage of data like RDD partitions cached in memory. ">Storage Memory</span>
+                  title="Memory used / total available memory for storage of data like RDD partitions cached in memory.">Storage Memory</span>
+        </th>
+        <th class="on_heap_memory">
+          <span data-toggle="tooltip"
+                title="Memory used / total available memory for on heap storage of data like RDD partitions cached in memory.">On Heap Storage Memory</span>
+        </th>
+        <th class="off_heap_memory">
+          <span data-toggle="tooltip"
+                title="Memory used / total available memory for off heap storage of data like RDD partitions cached in memory.">Off Heap Storage Memory</span>
         </th>
         <th>Disk Used</th>
         <th>Cores</th>
@@ -73,6 +81,14 @@ limitations under the License.
             <span data-toggle="tooltip" data-placement="top"
                   title="Memory used / total available memory for storage of data like RDD partitions cached in memory.">
               Storage Memory</span></th>
+          <th class="on_heap_memory">
+            <span data-toggle="tooltip" data-placement="top"
+                  title="Memory used / total available memory for on heap storage of data like RDD partitions cached in memory.">
+              On Heap Storage Memory</span></th>
+          <th class="off_heap_memory">
+            <span data-toggle="tooltip"
+                  title="Memory used / total available memory for off heap storage of data like RDD partitions cached in memory.">
+              Off Heap Storage Memory</span></th>
           <th><span data-toggle="tooltip" data-placement="top" title="Disk Used">Disk Used</span></th>
           <th><span data-toggle="tooltip" data-placement="top" title="Cores">Cores</span></th>
           <th><span data-toggle="tooltip" data-placement="top" title="Active Tasks">Active Tasks</span></th>
diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
index 7dbfe32de903a03bae6c25c713fb8a0250c73470..930a0698928d1e51d666a8d0271be332e655dd8b 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
@@ -190,6 +190,10 @@ $(document).ready(function () {
             var allRDDBlocks = 0;
             var allMemoryUsed = 0;
             var allMaxMemory = 0;
+            var allOnHeapMemoryUsed = 0;
+            var allOnHeapMaxMemory = 0;
+            var allOffHeapMemoryUsed = 0;
+            var allOffHeapMaxMemory = 0;
             var allDiskUsed = 0;
             var allTotalCores = 0;
             var allMaxTasks = 0;
@@ -208,6 +212,10 @@ $(document).ready(function () {
             var activeRDDBlocks = 0;
             var activeMemoryUsed = 0;
             var activeMaxMemory = 0;
+            var activeOnHeapMemoryUsed = 0;
+            var activeOnHeapMaxMemory = 0;
+            var activeOffHeapMemoryUsed = 0;
+            var activeOffHeapMaxMemory = 0;
             var activeDiskUsed = 0;
             var activeTotalCores = 0;
             var activeMaxTasks = 0;
@@ -226,6 +234,10 @@ $(document).ready(function () {
             var deadRDDBlocks = 0;
             var deadMemoryUsed = 0;
             var deadMaxMemory = 0;
+            var deadOnHeapMemoryUsed = 0;
+            var deadOnHeapMaxMemory = 0;
+            var deadOffHeapMemoryUsed = 0;
+            var deadOffHeapMaxMemory = 0;
             var deadDiskUsed = 0;
             var deadTotalCores = 0;
             var deadMaxTasks = 0;
@@ -240,11 +252,22 @@ $(document).ready(function () {
             var deadTotalShuffleWrite = 0;
             var deadTotalBlacklisted = 0;
 
+            response.forEach(function (exec) {
+                exec.onHeapMemoryUsed = exec.hasOwnProperty('onHeapMemoryUsed') ? exec.onHeapMemoryUsed : 0;
+                exec.maxOnHeapMemory = exec.hasOwnProperty('maxOnHeapMemory') ? exec.maxOnHeapMemory : 0;
+                exec.offHeapMemoryUsed = exec.hasOwnProperty('offHeapMemoryUsed') ? exec.offHeapMemoryUsed : 0;
+                exec.maxOffHeapMemory = exec.hasOwnProperty('maxOffHeapMemory') ? exec.maxOffHeapMemory : 0;
+            });
+
             response.forEach(function (exec) {
                 allExecCnt += 1;
                 allRDDBlocks += exec.rddBlocks;
                 allMemoryUsed += exec.memoryUsed;
                 allMaxMemory += exec.maxMemory;
+                allOnHeapMemoryUsed += exec.onHeapMemoryUsed;
+                allOnHeapMaxMemory += exec.maxOnHeapMemory;
+                allOffHeapMemoryUsed += exec.offHeapMemoryUsed;
+                allOffHeapMaxMemory += exec.maxOffHeapMemory;
                 allDiskUsed += exec.diskUsed;
                 allTotalCores += exec.totalCores;
                 allMaxTasks += exec.maxTasks;
@@ -263,6 +286,10 @@ $(document).ready(function () {
                     activeRDDBlocks += exec.rddBlocks;
                     activeMemoryUsed += exec.memoryUsed;
                     activeMaxMemory += exec.maxMemory;
+                    activeOnHeapMemoryUsed += exec.onHeapMemoryUsed;
+                    activeOnHeapMaxMemory += exec.maxOnHeapMemory;
+                    activeOffHeapMemoryUsed += exec.offHeapMemoryUsed;
+                    activeOffHeapMaxMemory += exec.maxOffHeapMemory;
                     activeDiskUsed += exec.diskUsed;
                     activeTotalCores += exec.totalCores;
                     activeMaxTasks += exec.maxTasks;
@@ -281,6 +308,10 @@ $(document).ready(function () {
                     deadRDDBlocks += exec.rddBlocks;
                     deadMemoryUsed += exec.memoryUsed;
                     deadMaxMemory += exec.maxMemory;
+                    deadOnHeapMemoryUsed += exec.onHeapMemoryUsed;
+                    deadOnHeapMaxMemory += exec.maxOnHeapMemory;
+                    deadOffHeapMemoryUsed += exec.offHeapMemoryUsed;
+                    deadOffHeapMaxMemory += exec.maxOffHeapMemory;
                     deadDiskUsed += exec.diskUsed;
                     deadTotalCores += exec.totalCores;
                     deadMaxTasks += exec.maxTasks;
@@ -302,6 +333,10 @@ $(document).ready(function () {
                 "allRDDBlocks": allRDDBlocks,
                 "allMemoryUsed": allMemoryUsed,
                 "allMaxMemory": allMaxMemory,
+                "allOnHeapMemoryUsed": allOnHeapMemoryUsed,
+                "allOnHeapMaxMemory": allOnHeapMaxMemory,
+                "allOffHeapMemoryUsed": allOffHeapMemoryUsed,
+                "allOffHeapMaxMemory": allOffHeapMaxMemory,
                 "allDiskUsed": allDiskUsed,
                 "allTotalCores": allTotalCores,
                 "allMaxTasks": allMaxTasks,
@@ -321,6 +356,10 @@ $(document).ready(function () {
                 "allRDDBlocks": activeRDDBlocks,
                 "allMemoryUsed": activeMemoryUsed,
                 "allMaxMemory": activeMaxMemory,
+                "allOnHeapMemoryUsed": activeOnHeapMemoryUsed,
+                "allOnHeapMaxMemory": activeOnHeapMaxMemory,
+                "allOffHeapMemoryUsed": activeOffHeapMemoryUsed,
+                "allOffHeapMaxMemory": activeOffHeapMaxMemory,
                 "allDiskUsed": activeDiskUsed,
                 "allTotalCores": activeTotalCores,
                 "allMaxTasks": activeMaxTasks,
@@ -340,6 +379,10 @@ $(document).ready(function () {
                 "allRDDBlocks": deadRDDBlocks,
                 "allMemoryUsed": deadMemoryUsed,
                 "allMaxMemory": deadMaxMemory,
+                "allOnHeapMemoryUsed": deadOnHeapMemoryUsed,
+                "allOnHeapMaxMemory": deadOnHeapMaxMemory,
+                "allOffHeapMemoryUsed": deadOffHeapMemoryUsed,
+                "allOffHeapMaxMemory": deadOffHeapMaxMemory,
                 "allDiskUsed": deadDiskUsed,
                 "allTotalCores": deadTotalCores,
                 "allMaxTasks": deadMaxTasks,
@@ -378,7 +421,35 @@ $(document).ready(function () {
                         {data: 'rddBlocks'},
                         {
                             data: function (row, type) {
-                                return type === 'display' ? (formatBytes(row.memoryUsed, type) + ' / ' + formatBytes(row.maxMemory, type)) : row.memoryUsed;
+                                if (type !== 'display')
+                                    return row.memoryUsed;
+                                else
+                                    return (formatBytes(row.memoryUsed, type) + ' / ' +
+                                        formatBytes(row.maxMemory, type));
+                            }
+                        },
+                        {
+                            data: function (row, type) {
+                                if (type !== 'display')
+                                    return row.onHeapMemoryUsed;
+                                else
+                                    return (formatBytes(row.onHeapMemoryUsed, type) + ' / ' +
+                                        formatBytes(row.maxOnHeapMemory, type));
+                            },
+                            "fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
+                                $(nTd).addClass('on_heap_memory')
+                            }
+                        },
+                        {
+                            data: function (row, type) {
+                                if (type !== 'display')
+                                    return row.offHeapMemoryUsed;
+                                else
+                                    return (formatBytes(row.offHeapMemoryUsed, type) + ' / ' +
+                                        formatBytes(row.maxOffHeapMemory, type));
+                            },
+                            "fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
+                                $(nTd).addClass('off_heap_memory')
                             }
                         },
                         {data: 'diskUsed', render: formatBytes},
@@ -450,7 +521,35 @@ $(document).ready(function () {
                         {data: 'allRDDBlocks'},
                         {
                             data: function (row, type) {
-                                return type === 'display' ? (formatBytes(row.allMemoryUsed, type) + ' / ' + formatBytes(row.allMaxMemory, type)) : row.allMemoryUsed;
+                                if (type !== 'display')
+                                    return row.allMemoryUsed
+                                else
+                                    return (formatBytes(row.allMemoryUsed, type) + ' / ' +
+                                        formatBytes(row.allMaxMemory, type));
+                            }
+                        },
+                        {
+                            data: function (row, type) {
+                                if (type !== 'display')
+                                    return row.allOnHeapMemoryUsed;
+                                else
+                                    return (formatBytes(row.allOnHeapMemoryUsed, type) + ' / ' +
+                                        formatBytes(row.allOnHeapMaxMemory, type));
+                            },
+                            "fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
+                                $(nTd).addClass('on_heap_memory')
+                            }
+                        },
+                        {
+                            data: function (row, type) {
+                                if (type !== 'display')
+                                    return row.allOffHeapMemoryUsed;
+                                else
+                                    return (formatBytes(row.allOffHeapMemoryUsed, type) + ' / ' +
+                                        formatBytes(row.allOffHeapMaxMemory, type));
+                            },
+                            "fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
+                                $(nTd).addClass('off_heap_memory')
                             }
                         },
                         {data: 'allDiskUsed', render: formatBytes},
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css
index 319a719efaa794c1d5b3cc757965baf213eafadb..935d9b1aec615290ecc96785793c2e70bb1c9468 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css
@@ -205,7 +205,8 @@ span.additional-metric-title {
 /* Hide all additional metrics by default. This is done here rather than using JavaScript to
  * avoid slow page loads for stage pages with large numbers (e.g., thousands) of tasks. */
 .scheduler_delay, .deserialization_time, .fetch_wait_time, .shuffle_read_remote,
-.serialization_time, .getting_result_time, .peak_execution_memory {
+.serialization_time, .getting_result_time, .peak_execution_memory,
+.on_heap_memory, .off_heap_memory {
   display: none;
 }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 4331addb44172ba5e2b421e5fabca28084c25062..bc2e53071668671bc3fa9d8ca7304979cbfd1fc4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -87,8 +87,13 @@ case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(S
   extends SparkListenerEvent
 
 @DeveloperApi
-case class SparkListenerBlockManagerAdded(time: Long, blockManagerId: BlockManagerId, maxMem: Long)
-  extends SparkListenerEvent
+case class SparkListenerBlockManagerAdded(
+    time: Long,
+    blockManagerId: BlockManagerId,
+    maxMem: Long,
+    maxOnHeapMem: Option[Long] = None,
+    maxOffHeapMem: Option[Long] = None) extends SparkListenerEvent {
+}
 
 @DeveloperApi
 case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockManagerId)
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
index 5c03609e5e5e57c4d935b34b171b7c5d13e6f777..1279b281ad8d839cd0da507bfe8fc534159fcdde 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
@@ -70,7 +70,13 @@ private[spark] object AllRDDResource {
           address = status.blockManagerId.hostPort,
           memoryUsed = status.memUsedByRdd(rddId),
           memoryRemaining = status.memRemaining,
-          diskUsed = status.diskUsedByRdd(rddId)
+          diskUsed = status.diskUsedByRdd(rddId),
+          onHeapMemoryUsed = Some(
+            if (!rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L),
+          offHeapMemoryUsed = Some(
+            if (rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L),
+          onHeapMemoryRemaining = status.onHeapMemRemaining,
+          offHeapMemoryRemaining = status.offHeapMemRemaining
         ) } )
     } else {
       None
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 5b9227350edaaf0d923c922fd7afc7c882ec186d..d159b9450ef5cb22145c2ca2f63d1980e190fc3f 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -75,7 +75,11 @@ class ExecutorSummary private[spark](
     val totalShuffleWrite: Long,
     val isBlacklisted: Boolean,
     val maxMemory: Long,
-    val executorLogs: Map[String, String])
+    val executorLogs: Map[String, String],
+    val onHeapMemoryUsed: Option[Long],
+    val offHeapMemoryUsed: Option[Long],
+    val maxOnHeapMemory: Option[Long],
+    val maxOffHeapMemory: Option[Long])
 
 class JobData private[spark](
     val jobId: Int,
@@ -111,7 +115,11 @@ class RDDDataDistribution private[spark](
     val address: String,
     val memoryUsed: Long,
     val memoryRemaining: Long,
-    val diskUsed: Long)
+    val diskUsed: Long,
+    val onHeapMemoryUsed: Option[Long],
+    val offHeapMemoryUsed: Option[Long],
+    val onHeapMemoryRemaining: Option[Long],
+    val offHeapMemoryRemaining: Option[Long])
 
 class RDDPartitionInfo private[spark](
     val blockName: String,
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 46a078b2f9f9360533884fea97653df06b7179e9..63acba65d3c5b7da26373640f9919cfcc7731e72 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -150,8 +150,8 @@ private[spark] class BlockManager(
   // However, since we use this only for reporting and logging, what we actually want here is
   // the absolute maximum value that `maxMemory` can ever possibly reach. We may need
   // to revisit whether reporting this value as the "max" is intuitive to the user.
-  private val maxMemory =
-    memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory
+  private val maxOnHeapMemory = memoryManager.maxOnHeapStorageMemory
+  private val maxOffHeapMemory = memoryManager.maxOffHeapStorageMemory
 
   // Port used by the external shuffle service. In Yarn mode, this may be already be
   // set through the Hadoop configuration as the server is launched in the Yarn NM.
@@ -229,7 +229,8 @@ private[spark] class BlockManager(
 
     val idFromMaster = master.registerBlockManager(
       id,
-      maxMemory,
+      maxOnHeapMemory,
+      maxOffHeapMemory,
       slaveEndpoint)
 
     blockManagerId = if (idFromMaster != null) idFromMaster else id
@@ -307,7 +308,7 @@ private[spark] class BlockManager(
   def reregister(): Unit = {
     // TODO: We might need to rate limit re-registering.
     logInfo(s"BlockManager $blockManagerId re-registering with master")
-    master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
+    master.registerBlockManager(blockManagerId, maxOnHeapMemory, maxOffHeapMemory, slaveEndpoint)
     reportAllBlocks()
   }
 
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 3ca690db9e79fb34feecf144dcbecb21e3488b22..ea5d8423a588c0cf20f0fd68a2c755959c4bcce5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -57,11 +57,12 @@ class BlockManagerMaster(
    */
   def registerBlockManager(
       blockManagerId: BlockManagerId,
-      maxMemSize: Long,
+      maxOnHeapMemSize: Long,
+      maxOffHeapMemSize: Long,
       slaveEndpoint: RpcEndpointRef): BlockManagerId = {
     logInfo(s"Registering BlockManager $blockManagerId")
     val updatedId = driverEndpoint.askSync[BlockManagerId](
-      RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint))
+      RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
     logInfo(s"Registered BlockManager $updatedId")
     updatedId
   }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 84c04d22600adaf8a2a25d0078d348615f05e1e1..467c3e0e6b51f024402a8c3e007185222bbfbe2f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -71,8 +71,8 @@ class BlockManagerMasterEndpoint(
   logInfo("BlockManagerMasterEndpoint up")
 
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
-    case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) =>
-      context.reply(register(blockManagerId, maxMemSize, slaveEndpoint))
+    case RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) =>
+      context.reply(register(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
 
     case _updateBlockInfo @
         UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
@@ -276,7 +276,8 @@ class BlockManagerMasterEndpoint(
 
   private def storageStatus: Array[StorageStatus] = {
     blockManagerInfo.map { case (blockManagerId, info) =>
-      new StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala)
+      new StorageStatus(blockManagerId, info.maxMem, Some(info.maxOnHeapMem),
+        Some(info.maxOffHeapMem), info.blocks.asScala)
     }.toArray
   }
 
@@ -338,7 +339,8 @@ class BlockManagerMasterEndpoint(
    */
   private def register(
       idWithoutTopologyInfo: BlockManagerId,
-      maxMemSize: Long,
+      maxOnHeapMemSize: Long,
+      maxOffHeapMemSize: Long,
       slaveEndpoint: RpcEndpointRef): BlockManagerId = {
     // the dummy id is not expected to contain the topology information.
     // we get that info here and respond back with a more fleshed out block manager id
@@ -359,14 +361,15 @@ class BlockManagerMasterEndpoint(
         case None =>
       }
       logInfo("Registering block manager %s with %s RAM, %s".format(
-        id.hostPort, Utils.bytesToString(maxMemSize), id))
+        id.hostPort, Utils.bytesToString(maxOnHeapMemSize + maxOffHeapMemSize), id))
 
       blockManagerIdByExecutor(id.executorId) = id
 
       blockManagerInfo(id) = new BlockManagerInfo(
-        id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)
+        id, System.currentTimeMillis(), maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)
     }
-    listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
+    listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
+        Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
     id
   }
 
@@ -464,10 +467,13 @@ object BlockStatus {
 private[spark] class BlockManagerInfo(
     val blockManagerId: BlockManagerId,
     timeMs: Long,
-    val maxMem: Long,
+    val maxOnHeapMem: Long,
+    val maxOffHeapMem: Long,
     val slaveEndpoint: RpcEndpointRef)
   extends Logging {
 
+  val maxMem = maxOnHeapMem + maxOffHeapMem
+
   private var _lastSeenMs: Long = timeMs
   private var _remainingMem: Long = maxMem
 
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 0aea438e7f473e0baea6dd3038d332d8e26cbe3e..0c0ff144596ac2bd078d6860d0a336e929dee841 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -58,7 +58,8 @@ private[spark] object BlockManagerMessages {
 
   case class RegisterBlockManager(
       blockManagerId: BlockManagerId,
-      maxMemSize: Long,
+      maxOnHeapMemSize: Long,
+      maxOffHeapMemSize: Long,
       sender: RpcEndpointRef)
     extends ToBlockManagerMaster
 
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
index c5ba9af3e26581b61588f8f569f4777c47bb5a1d..197a01762c0c531960b1abe6f37cd27a1c4eddd8 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
@@ -26,35 +26,39 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager)
   override val metricRegistry = new MetricRegistry()
   override val sourceName = "BlockManager"
 
-  metricRegistry.register(MetricRegistry.name("memory", "maxMem_MB"), new Gauge[Long] {
-    override def getValue: Long = {
-      val storageStatusList = blockManager.master.getStorageStatus
-      val maxMem = storageStatusList.map(_.maxMem).sum
-      maxMem / 1024 / 1024
-    }
-  })
-
-  metricRegistry.register(MetricRegistry.name("memory", "remainingMem_MB"), new Gauge[Long] {
-    override def getValue: Long = {
-      val storageStatusList = blockManager.master.getStorageStatus
-      val remainingMem = storageStatusList.map(_.memRemaining).sum
-      remainingMem / 1024 / 1024
-    }
-  })
-
-  metricRegistry.register(MetricRegistry.name("memory", "memUsed_MB"), new Gauge[Long] {
-    override def getValue: Long = {
-      val storageStatusList = blockManager.master.getStorageStatus
-      val memUsed = storageStatusList.map(_.memUsed).sum
-      memUsed / 1024 / 1024
-    }
-  })
-
-  metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed_MB"), new Gauge[Long] {
-    override def getValue: Long = {
-      val storageStatusList = blockManager.master.getStorageStatus
-      val diskSpaceUsed = storageStatusList.map(_.diskUsed).sum
-      diskSpaceUsed / 1024 / 1024
-    }
-  })
+  private def registerGauge(name: String, func: BlockManagerMaster => Long): Unit = {
+    metricRegistry.register(name, new Gauge[Long] {
+      override def getValue: Long = func(blockManager.master) / 1024 / 1024
+    })
+  }
+
+  registerGauge(MetricRegistry.name("memory", "maxMem_MB"),
+    _.getStorageStatus.map(_.maxMem).sum)
+
+  registerGauge(MetricRegistry.name("memory", "maxOnHeapMem_MB"),
+    _.getStorageStatus.map(_.maxOnHeapMem.getOrElse(0L)).sum)
+
+  registerGauge(MetricRegistry.name("memory", "maxOffHeapMem_MB"),
+    _.getStorageStatus.map(_.maxOffHeapMem.getOrElse(0L)).sum)
+
+  registerGauge(MetricRegistry.name("memory", "remainingMem_MB"),
+    _.getStorageStatus.map(_.memRemaining).sum)
+
+  registerGauge(MetricRegistry.name("memory", "remainingOnHeapMem_MB"),
+    _.getStorageStatus.map(_.onHeapMemRemaining.getOrElse(0L)).sum)
+
+  registerGauge(MetricRegistry.name("memory", "remainingOffHeapMem_MB"),
+    _.getStorageStatus.map(_.offHeapMemRemaining.getOrElse(0L)).sum)
+
+  registerGauge(MetricRegistry.name("memory", "memUsed_MB"),
+    _.getStorageStatus.map(_.memUsed).sum)
+
+  registerGauge(MetricRegistry.name("memory", "onHeapMemUsed_MB"),
+    _.getStorageStatus.map(_.onHeapMemUsed.getOrElse(0L)).sum)
+
+  registerGauge(MetricRegistry.name("memory", "offHeapMemUsed_MB"),
+    _.getStorageStatus.map(_.offHeapMemUsed.getOrElse(0L)).sum)
+
+  registerGauge(MetricRegistry.name("disk", "diskSpaceUsed_MB"),
+    _.getStorageStatus.map(_.diskUsed).sum)
 }
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index 798658a15b797768eed05074721f9f0054706be6..1b30d4fa93bc0e4f94818d61b91c8d8066e57a72 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -41,7 +41,7 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
   }
 
   def deadStorageStatusList: Seq[StorageStatus] = synchronized {
-    deadExecutorStorageStatus.toSeq
+    deadExecutorStorageStatus
   }
 
   /** Update storage status list to reflect updated block statuses */
@@ -74,8 +74,10 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
     synchronized {
       val blockManagerId = blockManagerAdded.blockManagerId
       val executorId = blockManagerId.executorId
-      val maxMem = blockManagerAdded.maxMem
-      val storageStatus = new StorageStatus(blockManagerId, maxMem)
+      // The onHeap and offHeap memory are always defined for new applications,
+      // but they can be missing if we are replaying old event logs.
+      val storageStatus = new StorageStatus(blockManagerId, blockManagerAdded.maxMem,
+        blockManagerAdded.maxOnHeapMem, blockManagerAdded.maxOffHeapMem)
       executorIdToStorageStatus(executorId) = storageStatus
 
       // Try to remove the dead storage status if same executor register the block manager twice.
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 241aacd74b5860d5887c0fb27a5dfd199f95416a..8f0d181fc8fe55e9eaaf909ff68f95b274c7b34a 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -35,7 +35,11 @@ import org.apache.spark.internal.Logging
  * class cannot mutate the source of the information. Accesses are not thread-safe.
  */
 @DeveloperApi
-class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
+class StorageStatus(
+    val blockManagerId: BlockManagerId,
+    val maxMemory: Long,
+    val maxOnHeapMem: Option[Long],
+    val maxOffHeapMem: Option[Long]) {
 
   /**
    * Internal representation of the blocks stored in this block manager.
@@ -46,25 +50,21 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
   private val _rddBlocks = new mutable.HashMap[Int, mutable.Map[BlockId, BlockStatus]]
   private val _nonRddBlocks = new mutable.HashMap[BlockId, BlockStatus]
 
-  /**
-   * Storage information of the blocks that entails memory, disk, and off-heap memory usage.
-   *
-   * As with the block maps, we store the storage information separately for RDD blocks and
-   * non-RDD blocks for the same reason. In particular, RDD storage information is stored
-   * in a map indexed by the RDD ID to the following 4-tuple:
-   *
-   *   (memory size, disk size, storage level)
-   *
-   * We assume that all the blocks that belong to the same RDD have the same storage level.
-   * This field is not relevant to non-RDD blocks, however, so the storage information for
-   * non-RDD blocks contains only the first 3 fields (in the same order).
-   */
-  private val _rddStorageInfo = new mutable.HashMap[Int, (Long, Long, StorageLevel)]
-  private var _nonRddStorageInfo: (Long, Long) = (0L, 0L)
+  private case class RddStorageInfo(memoryUsage: Long, diskUsage: Long, level: StorageLevel)
+  private val _rddStorageInfo = new mutable.HashMap[Int, RddStorageInfo]
+
+  private case class NonRddStorageInfo(var onHeapUsage: Long, var offHeapUsage: Long,
+      var diskUsage: Long)
+  private val _nonRddStorageInfo = NonRddStorageInfo(0L, 0L, 0L)
 
   /** Create a storage status with an initial set of blocks, leaving the source unmodified. */
-  def this(bmid: BlockManagerId, maxMem: Long, initialBlocks: Map[BlockId, BlockStatus]) {
-    this(bmid, maxMem)
+  def this(
+      bmid: BlockManagerId,
+      maxMemory: Long,
+      maxOnHeapMem: Option[Long],
+      maxOffHeapMem: Option[Long],
+      initialBlocks: Map[BlockId, BlockStatus]) {
+    this(bmid, maxMemory, maxOnHeapMem, maxOffHeapMem)
     initialBlocks.foreach { case (bid, bstatus) => addBlock(bid, bstatus) }
   }
 
@@ -176,26 +176,57 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
    */
   def numRddBlocksById(rddId: Int): Int = _rddBlocks.get(rddId).map(_.size).getOrElse(0)
 
+  /** Return the max memory can be used by this block manager. */
+  def maxMem: Long = maxMemory
+
   /** Return the memory remaining in this block manager. */
   def memRemaining: Long = maxMem - memUsed
 
+  /** Return the memory used by caching RDDs */
+  def cacheSize: Long = onHeapCacheSize.getOrElse(0L) + offHeapCacheSize.getOrElse(0L)
+
   /** Return the memory used by this block manager. */
-  def memUsed: Long = _nonRddStorageInfo._1 + cacheSize
+  def memUsed: Long = onHeapMemUsed.getOrElse(0L) + offHeapMemUsed.getOrElse(0L)
 
-  /** Return the memory used by caching RDDs */
-  def cacheSize: Long = _rddBlocks.keys.toSeq.map(memUsedByRdd).sum
+  /** Return the on-heap memory remaining in this block manager. */
+  def onHeapMemRemaining: Option[Long] =
+    for (m <- maxOnHeapMem; o <- onHeapMemUsed) yield m - o
+
+  /** Return the off-heap memory remaining in this block manager. */
+  def offHeapMemRemaining: Option[Long] =
+    for (m <- maxOffHeapMem; o <- offHeapMemUsed) yield m - o
+
+  /** Return the on-heap memory used by this block manager. */
+  def onHeapMemUsed: Option[Long] = onHeapCacheSize.map(_ + _nonRddStorageInfo.onHeapUsage)
+
+  /** Return the off-heap memory used by this block manager. */
+  def offHeapMemUsed: Option[Long] = offHeapCacheSize.map(_ + _nonRddStorageInfo.offHeapUsage)
+
+  /** Return the memory used by on-heap caching RDDs */
+  def onHeapCacheSize: Option[Long] = maxOnHeapMem.map { _ =>
+    _rddStorageInfo.collect {
+      case (_, storageInfo) if !storageInfo.level.useOffHeap => storageInfo.memoryUsage
+    }.sum
+  }
+
+  /** Return the memory used by off-heap caching RDDs */
+  def offHeapCacheSize: Option[Long] = maxOffHeapMem.map { _ =>
+    _rddStorageInfo.collect {
+      case (_, storageInfo) if storageInfo.level.useOffHeap => storageInfo.memoryUsage
+    }.sum
+  }
 
   /** Return the disk space used by this block manager. */
-  def diskUsed: Long = _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
+  def diskUsed: Long = _nonRddStorageInfo.diskUsage + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
 
   /** Return the memory used by the given RDD in this block manager in O(1) time. */
-  def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L)
+  def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_.memoryUsage).getOrElse(0L)
 
   /** Return the disk space used by the given RDD in this block manager in O(1) time. */
-  def diskUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._2).getOrElse(0L)
+  def diskUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_.diskUsage).getOrElse(0L)
 
   /** Return the storage level, if any, used by the given RDD in this block manager. */
-  def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_._3)
+  def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_.level)
 
   /**
    * Update the relevant storage info, taking into account any existing status for this block.
@@ -210,10 +241,12 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
     val (oldMem, oldDisk) = blockId match {
       case RDDBlockId(rddId, _) =>
         _rddStorageInfo.get(rddId)
-          .map { case (mem, disk, _) => (mem, disk) }
+          .map { case RddStorageInfo(mem, disk, _) => (mem, disk) }
           .getOrElse((0L, 0L))
-      case _ =>
-        _nonRddStorageInfo
+      case _ if !level.useOffHeap =>
+        (_nonRddStorageInfo.onHeapUsage, _nonRddStorageInfo.diskUsage)
+      case _ if level.useOffHeap =>
+        (_nonRddStorageInfo.offHeapUsage, _nonRddStorageInfo.diskUsage)
     }
     val newMem = math.max(oldMem + changeInMem, 0L)
     val newDisk = math.max(oldDisk + changeInDisk, 0L)
@@ -225,13 +258,17 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
         if (newMem + newDisk == 0) {
           _rddStorageInfo.remove(rddId)
         } else {
-          _rddStorageInfo(rddId) = (newMem, newDisk, level)
+          _rddStorageInfo(rddId) = RddStorageInfo(newMem, newDisk, level)
         }
       case _ =>
-        _nonRddStorageInfo = (newMem, newDisk)
+        if (!level.useOffHeap) {
+          _nonRddStorageInfo.onHeapUsage = newMem
+        } else {
+          _nonRddStorageInfo.offHeapUsage = newMem
+        }
+        _nonRddStorageInfo.diskUsage = newDisk
     }
   }
-
 }
 
 /** Helper methods for storage-related objects. */
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index d849ce76a9e3c3cf3badf917dad5965280163db3..0a3c63d14ca8a2fcf8825ea3a1e179492cea0fb0 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -40,7 +40,8 @@ private[ui] case class ExecutorSummaryInfo(
     totalShuffleRead: Long,
     totalShuffleWrite: Long,
     isBlacklisted: Int,
-    maxMemory: Long,
+    maxOnHeapMem: Long,
+    maxOffHeapMem: Long,
     executorLogs: Map[String, String])
 
 
@@ -53,6 +54,34 @@ private[ui] class ExecutorsPage(
     val content =
       <div>
         {
+          <div>
+            <span class="expand-additional-metrics">
+              <span class="expand-additional-metrics-arrow arrow-closed"></span>
+              <a>Show Additional Metrics</a>
+            </span>
+            <div class="additional-metrics collapsed">
+              <ul>
+                <li>
+                  <input type="checkbox" id="select-all-metrics"/>
+                  <span class="additional-metric-title"><em>(De)select All</em></span>
+                </li>
+                <li>
+                  <span data-toggle="tooltip"
+                        title={ExecutorsPage.ON_HEAP_MEMORY_TOOLTIP} data-placement="right">
+                    <input type="checkbox" name="on_heap_memory"/>
+                    <span class="additional-metric-title">On Heap Storage Memory</span>
+                  </span>
+                </li>
+                <li>
+                  <span data-toggle="tooltip"
+                        title={ExecutorsPage.OFF_HEAP_MEMORY_TOOLTIP} data-placement="right">
+                    <input type="checkbox" name="off_heap_memory"/>
+                    <span class="additional-metric-title">Off Heap Storage Memory</span>
+                  </span>
+                </li>
+              </ul>
+            </div>
+          </div> ++
           <div id="active-executors"></div> ++
           <script src={UIUtils.prependBaseUri("/static/utils.js")}></script> ++
           <script src={UIUtils.prependBaseUri("/static/executorspage.js")}></script> ++
@@ -65,6 +94,11 @@ private[ui] class ExecutorsPage(
 }
 
 private[spark] object ExecutorsPage {
+  private val ON_HEAP_MEMORY_TOOLTIP = "Memory used / total available memory for on heap " +
+    "storage of data like RDD partitions cached in memory."
+  private val OFF_HEAP_MEMORY_TOOLTIP = "Memory used / total available memory for off heap " +
+    "storage of data like RDD partitions cached in memory."
+
   /** Represent an executor's info as a map given a storage status index */
   def getExecInfo(
       listener: ExecutorsListener,
@@ -80,6 +114,10 @@ private[spark] object ExecutorsPage {
     val rddBlocks = status.numBlocks
     val memUsed = status.memUsed
     val maxMem = status.maxMem
+    val onHeapMemUsed = status.onHeapMemUsed
+    val offHeapMemUsed = status.offHeapMemUsed
+    val maxOnHeapMem = status.maxOnHeapMem
+    val maxOffHeapMem = status.maxOffHeapMem
     val diskUsed = status.diskUsed
     val taskSummary = listener.executorToTaskSummary.getOrElse(execId, ExecutorTaskSummary(execId))
 
@@ -103,7 +141,11 @@ private[spark] object ExecutorsPage {
       taskSummary.shuffleWrite,
       taskSummary.isBlacklisted,
       maxMem,
-      taskSummary.executorLogs
+      taskSummary.executorLogs,
+      onHeapMemUsed,
+      offHeapMemUsed,
+      maxOnHeapMem,
+      maxOffHeapMem
     )
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 227e940c9c50c77c28ef982cc54080bccf2bd551..a1a0c729b9240ed264d1520c650266351222d215 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -147,7 +147,8 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
   /** Header fields for the worker table */
   private def workerHeader = Seq(
     "Host",
-    "Memory Usage",
+    "On Heap Memory Usage",
+    "Off Heap Memory Usage",
     "Disk Usage")
 
   /** Render an HTML row representing a worker */
@@ -155,8 +156,12 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
     <tr>
       <td>{worker.address}</td>
       <td>
-        {Utils.bytesToString(worker.memoryUsed)}
-        ({Utils.bytesToString(worker.memoryRemaining)} Remaining)
+        {Utils.bytesToString(worker.onHeapMemoryUsed.getOrElse(0L))}
+        ({Utils.bytesToString(worker.onHeapMemoryRemaining.getOrElse(0L))} Remaining)
+      </td>
+      <td>
+        {Utils.bytesToString(worker.offHeapMemoryUsed.getOrElse(0L))}
+        ({Utils.bytesToString(worker.offHeapMemoryRemaining.getOrElse(0L))} Remaining)
       </td>
       <td>{Utils.bytesToString(worker.diskUsed)}</td>
     </tr>
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 1d2cb7acefa334490200957b14a28c94acd7ee3e..8296c4294242cd08a2c7281f97314ab2077071d6 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -182,7 +182,9 @@ private[spark] object JsonProtocol {
     ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockManagerAdded) ~
     ("Block Manager ID" -> blockManagerId) ~
     ("Maximum Memory" -> blockManagerAdded.maxMem) ~
-    ("Timestamp" -> blockManagerAdded.time)
+    ("Timestamp" -> blockManagerAdded.time) ~
+    ("Maximum Onheap Memory" -> blockManagerAdded.maxOnHeapMem) ~
+    ("Maximum Offheap Memory" -> blockManagerAdded.maxOffHeapMem)
   }
 
   def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue = {
@@ -612,7 +614,9 @@ private[spark] object JsonProtocol {
     val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
     val maxMem = (json \ "Maximum Memory").extract[Long]
     val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
-    SparkListenerBlockManagerAdded(time, blockManagerId, maxMem)
+    val maxOnHeapMem = Utils.jsonOption(json \ "Maximum Onheap Memory").map(_.extract[Long])
+    val maxOffHeapMem = Utils.jsonOption(json \ "Maximum Offheap Memory").map(_.extract[Long])
+    SparkListenerBlockManagerAdded(time, blockManagerId, maxMem, maxOnHeapMem, maxOffHeapMem)
   }
 
   def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = {
diff --git a/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
new file mode 100644
index 0000000000000000000000000000000000000000..e732af2663503081ece27f0c7757df058443c735
--- /dev/null
+++ b/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
@@ -0,0 +1,139 @@
+[ {
+  "id" : "2",
+  "hostPort" : "172.22.0.167:51487",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 4,
+  "maxTasks" : 4,
+  "activeTasks" : 0,
+  "failedTasks" : 4,
+  "completedTasks" : 0,
+  "totalTasks" : 4,
+  "totalDuration" : 2537,
+  "totalGCTime" : 88,
+  "totalInputBytes" : 0,
+  "totalShuffleRead" : 0,
+  "totalShuffleWrite" : 0,
+  "isBlacklisted" : true,
+  "maxMemory" : 908381388,
+  "executorLogs" : {
+    "stdout" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stdout",
+    "stderr" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stderr"
+  },
+  "onHeapMemoryUsed" : 0,
+  "offHeapMemoryUsed" : 0,
+  "maxOnHeapMemory" : 384093388,
+  "maxOffHeapMemory" : 524288000
+}, {
+  "id" : "driver",
+  "hostPort" : "172.22.0.167:51475",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 0,
+  "maxTasks" : 0,
+  "activeTasks" : 0,
+  "failedTasks" : 0,
+  "completedTasks" : 0,
+  "totalTasks" : 0,
+  "totalDuration" : 0,
+  "totalGCTime" : 0,
+  "totalInputBytes" : 0,
+  "totalShuffleRead" : 0,
+  "totalShuffleWrite" : 0,
+  "isBlacklisted" : true,
+  "maxMemory" : 908381388,
+  "executorLogs" : { },
+  "onHeapMemoryUsed" : 0,
+  "offHeapMemoryUsed" : 0,
+  "maxOnHeapMemory" : 384093388,
+  "maxOffHeapMemory" : 524288000
+}, {
+  "id" : "1",
+  "hostPort" : "172.22.0.167:51490",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 4,
+  "maxTasks" : 4,
+  "activeTasks" : 0,
+  "failedTasks" : 0,
+  "completedTasks" : 4,
+  "totalTasks" : 4,
+  "totalDuration" : 3152,
+  "totalGCTime" : 68,
+  "totalInputBytes" : 0,
+  "totalShuffleRead" : 0,
+  "totalShuffleWrite" : 0,
+  "isBlacklisted" : true,
+  "maxMemory" : 908381388,
+  "executorLogs" : {
+    "stdout" : "http://172.22.0.167:51467/logPage/?appId=app-20161116163331-0000&executorId=1&logType=stdout",
+    "stderr" : "http://172.22.0.167:51467/logPage/?appId=app-20161116163331-0000&executorId=1&logType=stderr"
+  },
+
+  "onHeapMemoryUsed" : 0,
+  "offHeapMemoryUsed" : 0,
+  "maxOnHeapMemory" : 384093388,
+  "maxOffHeapMemory" : 524288000
+}, {
+  "id" : "0",
+  "hostPort" : "172.22.0.167:51491",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 4,
+  "maxTasks" : 4,
+  "activeTasks" : 0,
+  "failedTasks" : 4,
+  "completedTasks" : 0,
+  "totalTasks" : 4,
+  "totalDuration" : 2551,
+  "totalGCTime" : 116,
+  "totalInputBytes" : 0,
+  "totalShuffleRead" : 0,
+  "totalShuffleWrite" : 0,
+  "isBlacklisted" : true,
+  "maxMemory" : 908381388,
+  "executorLogs" : {
+    "stdout" : "http://172.22.0.167:51465/logPage/?appId=app-20161116163331-0000&executorId=0&logType=stdout",
+    "stderr" : "http://172.22.0.167:51465/logPage/?appId=app-20161116163331-0000&executorId=0&logType=stderr"
+  },
+  "onHeapMemoryUsed" : 0,
+  "offHeapMemoryUsed" : 0,
+  "maxOnHeapMemory" : 384093388,
+  "maxOffHeapMemory" : 524288000
+}, {
+  "id" : "3",
+  "hostPort" : "172.22.0.167:51485",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 4,
+  "maxTasks" : 4,
+  "activeTasks" : 0,
+  "failedTasks" : 0,
+  "completedTasks" : 12,
+  "totalTasks" : 12,
+  "totalDuration" : 2453,
+  "totalGCTime" : 72,
+  "totalInputBytes" : 0,
+  "totalShuffleRead" : 0,
+  "totalShuffleWrite" : 0,
+  "isBlacklisted" : true,
+  "maxMemory" : 908381388,
+  "executorLogs" : {
+    "stdout" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stdout",
+    "stderr" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stderr"
+  },
+  "onHeapMemoryUsed" : 0,
+  "offHeapMemoryUsed" : 0,
+  "maxOnHeapMemory" : 384093388,
+  "maxOffHeapMemory" : 524288000
+} ]
diff --git a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
index 5914a1c2c4b6d9c8d395b61282fa7ba57f8df543..e732af2663503081ece27f0c7757df058443c735 100644
--- a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
@@ -17,11 +17,15 @@
   "totalShuffleRead" : 0,
   "totalShuffleWrite" : 0,
   "isBlacklisted" : true,
-  "maxMemory" : 384093388,
+  "maxMemory" : 908381388,
   "executorLogs" : {
     "stdout" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stdout",
     "stderr" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stderr"
-  }
+  },
+  "onHeapMemoryUsed" : 0,
+  "offHeapMemoryUsed" : 0,
+  "maxOnHeapMemory" : 384093388,
+  "maxOffHeapMemory" : 524288000
 }, {
   "id" : "driver",
   "hostPort" : "172.22.0.167:51475",
@@ -41,8 +45,12 @@
   "totalShuffleRead" : 0,
   "totalShuffleWrite" : 0,
   "isBlacklisted" : true,
-  "maxMemory" : 384093388,
-  "executorLogs" : { }
+  "maxMemory" : 908381388,
+  "executorLogs" : { },
+  "onHeapMemoryUsed" : 0,
+  "offHeapMemoryUsed" : 0,
+  "maxOnHeapMemory" : 384093388,
+  "maxOffHeapMemory" : 524288000
 }, {
   "id" : "1",
   "hostPort" : "172.22.0.167:51490",
@@ -62,11 +70,16 @@
   "totalShuffleRead" : 0,
   "totalShuffleWrite" : 0,
   "isBlacklisted" : true,
-  "maxMemory" : 384093388,
+  "maxMemory" : 908381388,
   "executorLogs" : {
     "stdout" : "http://172.22.0.167:51467/logPage/?appId=app-20161116163331-0000&executorId=1&logType=stdout",
     "stderr" : "http://172.22.0.167:51467/logPage/?appId=app-20161116163331-0000&executorId=1&logType=stderr"
-  }
+  },
+
+  "onHeapMemoryUsed" : 0,
+  "offHeapMemoryUsed" : 0,
+  "maxOnHeapMemory" : 384093388,
+  "maxOffHeapMemory" : 524288000
 }, {
   "id" : "0",
   "hostPort" : "172.22.0.167:51491",
@@ -86,11 +99,15 @@
   "totalShuffleRead" : 0,
   "totalShuffleWrite" : 0,
   "isBlacklisted" : true,
-  "maxMemory" : 384093388,
+  "maxMemory" : 908381388,
   "executorLogs" : {
     "stdout" : "http://172.22.0.167:51465/logPage/?appId=app-20161116163331-0000&executorId=0&logType=stdout",
     "stderr" : "http://172.22.0.167:51465/logPage/?appId=app-20161116163331-0000&executorId=0&logType=stderr"
-  }
+  },
+  "onHeapMemoryUsed" : 0,
+  "offHeapMemoryUsed" : 0,
+  "maxOnHeapMemory" : 384093388,
+  "maxOffHeapMemory" : 524288000
 }, {
   "id" : "3",
   "hostPort" : "172.22.0.167:51485",
@@ -110,9 +127,13 @@
   "totalShuffleRead" : 0,
   "totalShuffleWrite" : 0,
   "isBlacklisted" : true,
-  "maxMemory" : 384093388,
+  "maxMemory" : 908381388,
   "executorLogs" : {
     "stdout" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stdout",
     "stderr" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stderr"
-  }
+  },
+  "onHeapMemoryUsed" : 0,
+  "offHeapMemoryUsed" : 0,
+  "maxOnHeapMemory" : 384093388,
+  "maxOffHeapMemory" : 524288000
 } ]
diff --git a/core/src/test/resources/spark-events/app-20161116163331-0000 b/core/src/test/resources/spark-events/app-20161116163331-0000
index 7566c9fc0a20b134e6572f90972056d064837f2a..57cfc5b9731291a2b6234320bf14ac4315f221fb 100755
--- a/core/src/test/resources/spark-events/app-20161116163331-0000
+++ b/core/src/test/resources/spark-events/app-20161116163331-0000
@@ -1,15 +1,15 @@
 {"Event":"SparkListenerLogStart","Spark Version":"2.1.0-SNAPSHOT"}
-{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"172.22.0.167","Port":51475},"Maximum Memory":384093388,"Timestamp":1479335611477}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"172.22.0.167","Port":51475},"Maximum Memory":908381388,"Timestamp":1479335611477,"Maximum Onheap Memory":384093388,"Maximum Offheap Memory":524288000}
 {"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre","Java Version":"1.8.0_92 (Oracle Corporation)","Scala Version":"version 2.11.8"},"Spark Properties":{"spark.blacklist.task.maxTaskAttemptsPerExecutor":"3","spark.blacklist.enabled":"TRUE","spark.driver.host":"172.22.0.167","spark.blacklist.task.maxTaskAttemptsPerNode":"3","spark.eventLog.enabled":"TRUE","spark.driver.port":"51459","spark.repl.class.uri":"spark://172.22.0.167:51459/classes","spark.jars":"","spark.repl.class.outputDir":"/private/var/folders/l4/d46wlzj16593f3d812vk49tw0000gp/T/spark-1cbc97d0-7fe6-4c9f-8c2c-f6fe51ee3cf2/repl-39929169-ac4c-4c6d-b116-f648e4dd62ed","spark.app.name":"Spark shell","spark.blacklist.stage.maxFailedExecutorsPerNode":"3","spark.scheduler.mode":"FIFO","spark.eventLog.overwrite":"TRUE","spark.blacklist.stage.maxFailedTasksPerExecutor":"3","spark.executor.id":"driver","spark.blacklist.application.maxFailedExecutorsPerNode":"2","spark.submit.deployMode":"client","spark.master":"local-cluster[4,4,1024]","spark.home":"/Users/Jose/IdeaProjects/spark","spark.eventLog.dir":"/Users/jose/logs","spark.sql.catalogImplementation":"in-memory","spark.eventLog.compress":"FALSE","spark.blacklist.application.maxFailedTasksPerExecutor":"1","spark.blacklist.timeout":"1000000","spark.app.id":"app-20161116163331-0000","spark.task.maxFailures":"4"},"System Properties":{"java.io.tmpdir":"/var/folders/l4/d46wlzj16593f3d812vk49tw0000gp/T/","line.separator":"\n","path.separator":":","sun.management.compiler":"HotSpot 64-Bit Tiered Compilers","SPARK_SUBMIT":"true","sun.cpu.endian":"little","java.specification.version":"1.8","java.vm.specification.name":"Java Virtual Machine Specification","java.vendor":"Oracle Corporation","java.vm.specification.version":"1.8","user.home":"/Users/Jose","file.encoding.pkg":"sun.io","sun.nio.ch.bugLevel":"","ftp.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","sun.arch.data.model":"64","sun.boot.library.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre/lib","user.dir":"/Users/Jose/IdeaProjects/spark","java.library.path":"/Users/Jose/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.","sun.cpu.isalist":"","os.arch":"x86_64","java.vm.version":"25.92-b14","java.endorsed.dirs":"/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre/lib/endorsed","java.runtime.version":"1.8.0_92-b14","java.vm.info":"mixed mode","java.ext.dirs":"/Users/Jose/Library/Java/Extensions:/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre/lib/ext:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java","java.runtime.name":"Java(TM) SE Runtime Environment","file.separator":"/","io.netty.maxDirectMemory":"0","java.class.version":"52.0","scala.usejavacp":"true","java.specification.name":"Java Platform API Specification","sun.boot.class.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre/lib/sunrsasign.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre/classes","file.encoding":"UTF-8","user.timezone":"America/Chicago","java.specification.vendor":"Oracle Corporation","sun.java.launcher":"SUN_STANDARD","os.version":"10.11.6","sun.os.patch.level":"unknown","gopherProxySet":"false","java.vm.specification.vendor":"Oracle Corporation","user.country":"US","sun.jnu.encoding":"UTF-8","http.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","user.language":"en","socksNonProxyHosts":"local|*.local|169.254/16|*.169.254/16","java.vendor.url":"http://java.oracle.com/","java.awt.printerjob":"sun.lwawt.macosx.CPrinterJob","java.awt.graphicsenv":"sun.awt.CGraphicsEnvironment","awt.toolkit":"sun.lwawt.macosx.LWCToolkit","os.name":"Mac OS X","java.vm.vendor":"Oracle Corporation","java.vendor.url.bug":"http://bugreport.sun.com/bugreport/","user.name":"jose","java.vm.name":"Java HotSpot(TM) 64-Bit Server VM","sun.java.command":"org.apache.spark.deploy.SparkSubmit --master local-cluster[4,4,1024] --conf spark.blacklist.enabled=TRUE --conf spark.blacklist.timeout=1000000 --conf spark.blacklist.application.maxFailedTasksPerExecutor=1 --conf spark.eventLog.overwrite=TRUE --conf spark.blacklist.task.maxTaskAttemptsPerNode=3 --conf spark.blacklist.stage.maxFailedTasksPerExecutor=3 --conf spark.blacklist.task.maxTaskAttemptsPerExecutor=3 --conf spark.eventLog.compress=FALSE --conf spark.blacklist.stage.maxFailedExecutorsPerNode=3 --conf spark.eventLog.enabled=TRUE --conf spark.eventLog.dir=/Users/jose/logs --conf spark.blacklist.application.maxFailedExecutorsPerNode=2 --conf spark.task.maxFailures=4 --class org.apache.spark.repl.Main --name Spark shell spark-shell -i /Users/Jose/dev/jose-utils/blacklist/test-blacklist.scala","java.home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre","java.version":"1.8.0_92","sun.io.unicode.encoding":"UnicodeBig"},"Classpath Entries":{"/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/avro-mapred-1.7.7-hadoop2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-mapreduce-client-core-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-servlet-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/parquet-column-1.8.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/snappy-java-1.1.2.6.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/oro-2.0.8.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/arpack_combined_all-0.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/pmml-schema-1.2.15.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-assembly_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/javassist-3.18.1-GA.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-tags_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-launcher_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-math3-3.4.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hk2-api-2.4.0-b34.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/scala-xml_2.11-1.0.4.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/objenesis-2.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spire-macros_2.11-0.7.4.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/scala-reflect-2.11.8.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-mllib-local_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-mllib_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jersey-server-2.22.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/core/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jackson-mapper-asl-1.9.13.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jackson-module-scala_2.11-2.6.5.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/curator-framework-2.4.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/javax.inject-1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/curator-client-2.4.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jackson-core-asl-1.9.13.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/common/network-common/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/zookeeper-3.4.5.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-auth-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/repl/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jul-to-slf4j-1.7.16.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jersey-media-jaxb-2.22.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-io-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/RoaringBitmap-0.5.11.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/javax.ws.rs-api-2.0.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/sql/catalyst/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-unsafe_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-repl_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-continuation-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-yarn-client-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/sql/hive-thriftserver/target/scala-2.11/classes":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-annotations-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/metrics-graphite-3.1.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-yarn-api-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jersey-container-servlet-core-2.22.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/streaming/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-net-3.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-proxy-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-catalyst_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/lz4-1.3.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-crypto-1.0.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/common/network-yarn/target/scala-2.11/classes":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/javax.annotation-api-1.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-sql_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/guava-14.0.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/javax.servlet-api-3.1.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-collections-3.2.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/conf/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/unused-1.0.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/aopalliance-1.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/parquet-encoding-1.8.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/common/tags/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/json4s-jackson_2.11-3.2.11.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-cli-1.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-yarn-server-common-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/cglib-2.2.1-v20090111.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/pyrolite-4.13.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/scala-library-2.11.8.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/scala-parser-combinators_2.11-1.0.4.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-util-6.1.26.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/py4j-0.10.4.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-configuration-1.6.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/core-1.1.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/core/target/jars/*":"System Classpath","/Users/Jose/IdeaProjects/spark/common/network-shuffle/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/parquet-format-2.3.0-incubating.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/kryo-shaded-3.0.3.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/sql/core/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/chill-java-0.8.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jackson-annotations-2.6.5.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/parquet-hadoop-1.8.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/sql/hive/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/avro-ipc-1.7.7.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/xz-1.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/parquet-jackson-1.8.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/aopalliance-repackaged-2.4.0-b34.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jersey-common-2.22.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/log4j-1.2.17.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/metrics-core-3.1.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-util-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/scalap-2.11.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/osgi-resource-locator-1.0.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-beanutils-1.7.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-compress-1.4.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jcl-over-slf4j-1.7.16.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/yarn/target/scala-2.11/classes":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-plus-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/protobuf-java-2.5.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/common/unsafe/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jackson-module-paranamer-2.6.5.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/leveldbjni-all-1.8.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jackson-core-2.6.5.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/slf4j-api-1.7.16.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/compress-lzf-1.0.3.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/stream-2.7.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-mapreduce-client-shuffle-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-codec-1.10.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-yarn-common-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/common/sketch/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/breeze_2.11-0.12.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-mapreduce-client-common-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-core_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jersey-container-servlet-2.22.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-network-shuffle_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-lang-2.5.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/ivy-2.4.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-common-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-math-2.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-hdfs-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/scala-compiler-2.11.8.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/metrics-jvm-3.1.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-lang3-3.5.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jsr305-1.3.9.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/minlog-1.3.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/netty-3.8.0.Final.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-webapp-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/json4s-ast_2.11-3.2.11.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/xbean-asm5-shaded-4.4.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-io-2.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/slf4j-log4j12-1.7.16.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hk2-locator-2.4.0-b34.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/shapeless_2.11-2.0.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-network-common_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-xml-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-httpclient-3.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/javax.inject-2.4.0-b34.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/mllib/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/scalatest_2.11-2.2.6.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hk2-utils-2.4.0-b34.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-client-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jersey-guava-2.22.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-jndi-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/graphx/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-mapreduce-client-app-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/examples/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/xmlenc-0.52.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jets3t-0.7.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/curator-recipes-2.4.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/opencsv-2.3.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jtransforms-2.4.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/antlr4-runtime-4.5.3.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/chill_2.11-0.8.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-digester-1.8.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/univocity-parsers-2.2.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jline-2.12.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-streaming_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/launcher/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/breeze-macros_2.11-0.12.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jersey-client-2.22.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jackson-databind-2.6.5.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-servlets-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/paranamer-2.6.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-security-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/avro-ipc-1.7.7-tests.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/avro-1.7.7.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spire_2.11-0.7.4.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-client-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/metrics-json-3.1.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-beanutils-core-1.8.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/validation-api-1.1.0.Final.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-graphx_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/netty-all-4.0.41.Final.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/janino-3.0.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/json4s-core_2.11-3.2.11.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-compiler-3.0.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/guice-3.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-server-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-http-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/parquet-common-1.8.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-mapreduce-client-jobclient-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-sketch_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/pmml-model-1.2.15.jar":"System Classpath"}}
 {"Event":"SparkListenerApplicationStart","App Name":"Spark shell","App ID":"app-20161116163331-0000","Timestamp":1479335609916,"User":"jose"}
 {"Event":"SparkListenerExecutorAdded","Timestamp":1479335615320,"Executor ID":"3","Executor Info":{"Host":"172.22.0.167","Total Cores":4,"Log Urls":{"stdout":"http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stdout","stderr":"http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stderr"}}}
-{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"3","Host":"172.22.0.167","Port":51485},"Maximum Memory":384093388,"Timestamp":1479335615387}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"3","Host":"172.22.0.167","Port":51485},"Maximum Memory":908381388,"Timestamp":1479335615387,"Maximum Onheap Memory":384093388,"Maximum Offheap Memory":524288000}
 {"Event":"SparkListenerExecutorAdded","Timestamp":1479335615393,"Executor ID":"2","Executor Info":{"Host":"172.22.0.167","Total Cores":4,"Log Urls":{"stdout":"http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stdout","stderr":"http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stderr"}}}
 {"Event":"SparkListenerExecutorAdded","Timestamp":1479335615443,"Executor ID":"1","Executor Info":{"Host":"172.22.0.167","Total Cores":4,"Log Urls":{"stdout":"http://172.22.0.167:51467/logPage/?appId=app-20161116163331-0000&executorId=1&logType=stdout","stderr":"http://172.22.0.167:51467/logPage/?appId=app-20161116163331-0000&executorId=1&logType=stderr"}}}
-{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"2","Host":"172.22.0.167","Port":51487},"Maximum Memory":384093388,"Timestamp":1479335615448}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"2","Host":"172.22.0.167","Port":51487},"Maximum Memory":908381388,"Timestamp":1479335615448,"Maximum Onheap Memory":384093388,"Maximum Offheap Memory":524288000}
 {"Event":"SparkListenerExecutorAdded","Timestamp":1479335615462,"Executor ID":"0","Executor Info":{"Host":"172.22.0.167","Total Cores":4,"Log Urls":{"stdout":"http://172.22.0.167:51465/logPage/?appId=app-20161116163331-0000&executorId=0&logType=stdout","stderr":"http://172.22.0.167:51465/logPage/?appId=app-20161116163331-0000&executorId=0&logType=stderr"}}}
-{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"1","Host":"172.22.0.167","Port":51490},"Maximum Memory":384093388,"Timestamp":1479335615496}
-{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"0","Host":"172.22.0.167","Port":51491},"Maximum Memory":384093388,"Timestamp":1479335615515}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"1","Host":"172.22.0.167","Port":51490},"Maximum Memory":908381388,"Timestamp":1479335615496,"Maximum Onheap Memory":384093388,"Maximum Offheap Memory":524288000}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"0","Host":"172.22.0.167","Port":51491},"Maximum Memory":908381388,"Timestamp":1479335615515,"Maximum Onheap Memory":384093388,"Maximum Offheap Memory":524288000}
 {"Event":"SparkListenerJobStart","Job ID":0,"Submission Time":1479335616467,"Stage Infos":[{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"count at <console>:26","Number of Tasks":16,"RDD Info":[{"RDD ID":1,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"1\",\"name\":\"map\"}","Callsite":"map at <console>:26","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":16,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":0,"Name":"ParallelCollectionRDD","Scope":"{\"id\":\"0\",\"name\":\"parallelize\"}","Callsite":"parallelize at <console>:26","Parent IDs":[],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":16,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0}],"Parent IDs":[],"Details":"org.apache.spark.rdd.RDD.count(RDD.scala:1135)\n$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:26)\n$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:31)\n$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:33)\n$line16.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:35)\n$line16.$read$$iw$$iw$$iw$$iw.<init>(<console>:37)\n$line16.$read$$iw$$iw$$iw.<init>(<console>:39)\n$line16.$read$$iw$$iw.<init>(<console>:41)\n$line16.$read$$iw.<init>(<console>:43)\n$line16.$read.<init>(<console>:45)\n$line16.$read$.<init>(<console>:49)\n$line16.$read$.<clinit>(<console>)\n$line16.$eval$.$print$lzycompute(<console>:7)\n$line16.$eval$.$print(<console>:6)\n$line16.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:498)\nscala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)","Accumulables":[]}],"Stage IDs":[0],"Properties":{}}
 {"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"count at <console>:26","Number of Tasks":16,"RDD Info":[{"RDD ID":1,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"1\",\"name\":\"map\"}","Callsite":"map at <console>:26","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":16,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":0,"Name":"ParallelCollectionRDD","Scope":"{\"id\":\"0\",\"name\":\"parallelize\"}","Callsite":"parallelize at <console>:26","Parent IDs":[],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":16,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0}],"Parent IDs":[],"Details":"org.apache.spark.rdd.RDD.count(RDD.scala:1135)\n$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:26)\n$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:31)\n$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:33)\n$line16.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:35)\n$line16.$read$$iw$$iw$$iw$$iw.<init>(<console>:37)\n$line16.$read$$iw$$iw$$iw.<init>(<console>:39)\n$line16.$read$$iw$$iw.<init>(<console>:41)\n$line16.$read$$iw.<init>(<console>:43)\n$line16.$read.<init>(<console>:45)\n$line16.$read$.<init>(<console>:49)\n$line16.$read$.<clinit>(<console>)\n$line16.$eval$.$print$lzycompute(<console>:7)\n$line16.$eval$.$print(<console>:6)\n$line16.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:498)\nscala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)","Accumulables":[]},"Properties":{}}
 {"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":0,"Index":0,"Attempt":0,"Launch Time":1479335616657,"Executor ID":"1","Host":"172.22.0.167","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index dcf83cb530a91b7a8b6ba259f8a50d1ddec6cd75..764156c3edc418334efc094dcecf69a48228fb9b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -153,7 +153,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
 
     "rdd list storage json" -> "applications/local-1422981780767/storage/rdd",
     "executor node blacklisting" -> "applications/app-20161116163331-0000/executors",
-    "executor node blacklisting unblacklisting" -> "applications/app-20161115172038-0000/executors"
+    "executor node blacklisting unblacklisting" -> "applications/app-20161115172038-0000/executors",
+    "executor memory usage" -> "applications/app-20161116163331-0000/executors"
     // Todo: enable this test when logging the even of onBlockUpdated. See: SPARK-13845
     // "one rdd storage json" -> "applications/local-1422981780767/storage/rdd/0"
   )
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
index e5733aebf607ca4cf9a63f00b448b341dc3696a8..da198f946fd646fcd3e49e72d27ff8a5e343ddd8 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
@@ -27,7 +27,7 @@ class StorageSuite extends SparkFunSuite {
 
   // For testing add, update, and remove (for non-RDD blocks)
   private def storageStatus1: StorageStatus = {
-    val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L)
+    val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L, Some(1000L), Some(0L))
     assert(status.blocks.isEmpty)
     assert(status.rddBlocks.isEmpty)
     assert(status.memUsed === 0L)
@@ -74,7 +74,7 @@ class StorageSuite extends SparkFunSuite {
 
   // For testing add, update, remove, get, and contains etc. for both RDD and non-RDD blocks
   private def storageStatus2: StorageStatus = {
-    val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L)
+    val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L, Some(1000L), Some(0L))
     assert(status.rddBlocks.isEmpty)
     status.addBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 10L, 20L))
     status.addBlock(TestBlockId("man"), BlockStatus(memAndDisk, 10L, 20L))
@@ -252,9 +252,9 @@ class StorageSuite extends SparkFunSuite {
 
   // For testing StorageUtils.updateRddInfo and StorageUtils.getRddBlockLocations
   private def stockStorageStatuses: Seq[StorageStatus] = {
-    val status1 = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L)
-    val status2 = new StorageStatus(BlockManagerId("fat", "duck", 2), 2000L)
-    val status3 = new StorageStatus(BlockManagerId("fat", "cat", 3), 3000L)
+    val status1 = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L, Some(1000L), Some(0L))
+    val status2 = new StorageStatus(BlockManagerId("fat", "duck", 2), 2000L, Some(2000L), Some(0L))
+    val status3 = new StorageStatus(BlockManagerId("fat", "cat", 3), 3000L, Some(3000L), Some(0L))
     status1.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L))
     status1.addBlock(RDDBlockId(0, 1), BlockStatus(memAndDisk, 1L, 2L))
     status2.addBlock(RDDBlockId(0, 2), BlockStatus(memAndDisk, 1L, 2L))
@@ -332,4 +332,81 @@ class StorageSuite extends SparkFunSuite {
     assert(blockLocations1(RDDBlockId(1, 2)) === Seq("cat:3"))
   }
 
+  private val offheap = StorageLevel.OFF_HEAP
+  // For testing add, update, remove, get, and contains etc. for both RDD and non-RDD onheap
+  // and offheap blocks
+  private def storageStatus3: StorageStatus = {
+    val status = new StorageStatus(BlockManagerId("big", "dog", 1), 2000L, Some(1000L), Some(1000L))
+    assert(status.rddBlocks.isEmpty)
+    status.addBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 10L, 20L))
+    status.addBlock(TestBlockId("man"), BlockStatus(offheap, 10L, 0L))
+    status.addBlock(RDDBlockId(0, 0), BlockStatus(offheap, 10L, 0L))
+    status.addBlock(RDDBlockId(1, 1), BlockStatus(offheap, 100L, 0L))
+    status.addBlock(RDDBlockId(2, 2), BlockStatus(memAndDisk, 10L, 20L))
+    status.addBlock(RDDBlockId(2, 3), BlockStatus(memAndDisk, 10L, 20L))
+    status.addBlock(RDDBlockId(2, 4), BlockStatus(memAndDisk, 10L, 40L))
+    status
+  }
+
+  test("storage memUsed, diskUsed with on-heap and off-heap blocks") {
+    val status = storageStatus3
+    def actualMemUsed: Long = status.blocks.values.map(_.memSize).sum
+    def actualDiskUsed: Long = status.blocks.values.map(_.diskSize).sum
+
+    def actualOnHeapMemUsed: Long =
+      status.blocks.values.filter(!_.storageLevel.useOffHeap).map(_.memSize).sum
+    def actualOffHeapMemUsed: Long =
+      status.blocks.values.filter(_.storageLevel.useOffHeap).map(_.memSize).sum
+
+    assert(status.maxMem === status.maxOnHeapMem.get + status.maxOffHeapMem.get)
+
+    assert(status.memUsed === actualMemUsed)
+    assert(status.diskUsed === actualDiskUsed)
+    assert(status.onHeapMemUsed.get === actualOnHeapMemUsed)
+    assert(status.offHeapMemUsed.get === actualOffHeapMemUsed)
+
+    assert(status.memRemaining === status.maxMem - actualMemUsed)
+    assert(status.onHeapMemRemaining.get === status.maxOnHeapMem.get - actualOnHeapMemUsed)
+    assert(status.offHeapMemRemaining.get === status.maxOffHeapMem.get - actualOffHeapMemUsed)
+
+    status.addBlock(TestBlockId("wire"), BlockStatus(memAndDisk, 400L, 500L))
+    status.addBlock(RDDBlockId(25, 25), BlockStatus(memAndDisk, 40L, 50L))
+    assert(status.memUsed === actualMemUsed)
+    assert(status.diskUsed === actualDiskUsed)
+
+    status.updateBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 4L, 5L))
+    status.updateBlock(RDDBlockId(0, 0), BlockStatus(offheap, 4L, 0L))
+    status.updateBlock(RDDBlockId(1, 1), BlockStatus(offheap, 4L, 0L))
+    assert(status.memUsed === actualMemUsed)
+    assert(status.diskUsed === actualDiskUsed)
+    assert(status.onHeapMemUsed.get === actualOnHeapMemUsed)
+    assert(status.offHeapMemUsed.get === actualOffHeapMemUsed)
+
+    status.removeBlock(TestBlockId("fire"))
+    status.removeBlock(TestBlockId("man"))
+    status.removeBlock(RDDBlockId(2, 2))
+    status.removeBlock(RDDBlockId(2, 3))
+    assert(status.memUsed === actualMemUsed)
+    assert(status.diskUsed === actualDiskUsed)
+  }
+
+  private def storageStatus4: StorageStatus = {
+    val status = new StorageStatus(BlockManagerId("big", "dog", 1), 2000L, None, None)
+    status
+  }
+  test("old SparkListenerBlockManagerAdded event compatible") {
+    // This scenario will only be happened when replaying old event log. In this scenario there's
+    // no block add or remove event replayed, so only total amount of memory is valid.
+    val status = storageStatus4
+    assert(status.maxMem === status.maxMemory)
+
+    assert(status.memUsed === 0L)
+    assert(status.diskUsed === 0L)
+    assert(status.onHeapMemUsed === None)
+    assert(status.offHeapMemUsed === None)
+
+    assert(status.memRemaining === status.maxMem)
+    assert(status.onHeapMemRemaining === None)
+    assert(status.offHeapMemRemaining === None)
+  }
 }
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index 42283730364257b27079db145fe30b055bf0e68b..f4c561c73779442193fa5a43f3001836412ed5eb 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -39,7 +39,7 @@ import org.apache.spark.LocalSparkContext._
 import org.apache.spark.api.java.StorageLevels
 import org.apache.spark.deploy.history.HistoryServerSuite
 import org.apache.spark.shuffle.FetchFailedException
-import org.apache.spark.status.api.v1.{JacksonMessageWriter, StageStatus}
+import org.apache.spark.status.api.v1.{JacksonMessageWriter, RDDDataDistribution, StageStatus}
 
 private[spark] class SparkUICssErrorHandler extends DefaultCssErrorHandler {
 
@@ -103,6 +103,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
       .set("spark.ui.enabled", "true")
       .set("spark.ui.port", "0")
       .set("spark.ui.killEnabled", killEnabled.toString)
+      .set("spark.memory.offHeap.size", "64m")
     val sc = new SparkContext(conf)
     assert(sc.ui.isDefined)
     sc
@@ -151,6 +152,39 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
       val updatedRddJson = getJson(ui, "storage/rdd/0")
       (updatedRddJson  \ "storageLevel").extract[String] should be (
         StorageLevels.MEMORY_ONLY.description)
+
+      val dataDistributions0 =
+        (updatedRddJson \ "dataDistribution").extract[Seq[RDDDataDistribution]]
+      dataDistributions0.length should be (1)
+      val dist0 = dataDistributions0.head
+
+      dist0.onHeapMemoryUsed should not be (None)
+      dist0.memoryUsed should be (dist0.onHeapMemoryUsed.get)
+      dist0.onHeapMemoryRemaining should not be (None)
+      dist0.offHeapMemoryRemaining should not be (None)
+      dist0.memoryRemaining should be (
+        dist0.onHeapMemoryRemaining.get + dist0.offHeapMemoryRemaining.get)
+      dist0.onHeapMemoryUsed should not be (Some(0L))
+      dist0.offHeapMemoryUsed should be (Some(0L))
+
+      rdd.unpersist()
+      rdd.persist(StorageLevels.OFF_HEAP).count()
+      val updatedStorageJson1 = getJson(ui, "storage/rdd")
+      updatedStorageJson1.children.length should be (1)
+      val updatedRddJson1 = getJson(ui, "storage/rdd/0")
+      val dataDistributions1 =
+        (updatedRddJson1 \ "dataDistribution").extract[Seq[RDDDataDistribution]]
+      dataDistributions1.length should be (1)
+      val dist1 = dataDistributions1.head
+
+      dist1.offHeapMemoryUsed should not be (None)
+      dist1.memoryUsed should be (dist1.offHeapMemoryUsed.get)
+      dist1.onHeapMemoryRemaining should not be (None)
+      dist1.offHeapMemoryRemaining should not be (None)
+      dist1.memoryRemaining should be (
+        dist1.onHeapMemoryRemaining.get + dist1.offHeapMemoryRemaining.get)
+      dist1.onHeapMemoryUsed should be (Some(0L))
+      dist1.offHeapMemoryUsed should not be (Some(0L))
     }
   }
 
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 2e3f9f2d0f3ac2e07cffda156ae22cae7ab1097b..feae76a087decb6305bbbf0aa625fb405179326f 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -100,7 +100,16 @@ object MimaExcludes {
     ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.ml.linalg.Matrix.toDenseMatrix"),
     ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.ml.linalg.Matrix.toSparseMatrix"),
     ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.ml.linalg.Matrix.getSizeInBytes")
-  )
+  ) ++ Seq(
+      // [SPARK-17019] Expose on-heap and off-heap memory usage in various places
+      ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerBlockManagerAdded.copy"),
+      ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerBlockManagerAdded.this"),
+      ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerBlockManagerAdded$"),
+      ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerBlockManagerAdded.apply"),
+      ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.storage.StorageStatus.this"),
+      ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.StorageStatus.this"),
+      ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.RDDDataDistribution.this")
+    )
 
   // Exclude rules for 2.1.x
   lazy val v21excludes = v20excludes ++ {