diff --git a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
index 0fac658d5784230f441f62f72d5f8e02f91c7b90..0ee6752b29e9a9aa7aabdb95ad895eee6db771d2 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
@@ -98,7 +98,16 @@ function drawTimeline(id, data, minX, maxX, minY, maxY, unitY, batchInterval) {
     var x = d3.scale.linear().domain([minX, maxX]).range([0, width]);
     var y = d3.scale.linear().domain([minY, maxY]).range([height, 0]);
 
-    var xAxis = d3.svg.axis().scale(x).orient("bottom").tickFormat(function(d) { return timeFormat[d]; });
+    var xAxis = d3.svg.axis().scale(x).orient("bottom").tickFormat(function(d) {
+        var formattedDate = timeFormat[d];
+        var dotIndex = formattedDate.indexOf('.');
+        if (dotIndex >= 0) {
+            // Remove milliseconds
+            return formattedDate.substring(0, dotIndex);
+        } else {
+            return formattedDate;
+        }
+    });
     var formatYValue = d3.format(",.2f");
     var yAxis = d3.svg.axis().scale(y).orient("left").ticks(5).tickFormat(formatYValue);
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index 3619e129ad9cf9ed55b0c0fcd64baec08a142c24..00cc47d6a3ca5c66de25201e6bcc2c818f243008 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -17,11 +17,14 @@
 
 package org.apache.spark.streaming.ui
 
+import java.text.SimpleDateFormat
+import java.util.Date
+
 import scala.xml.Node
 
 import org.apache.spark.ui.{UIUtils => SparkUIUtils}
 
-private[ui] abstract class BatchTableBase(tableId: String) {
+private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) {
 
   protected def columns: Seq[Node] = {
     <th>Batch Time</th>
@@ -35,7 +38,7 @@ private[ui] abstract class BatchTableBase(tableId: String) {
 
   protected def baseRow(batch: BatchUIData): Seq[Node] = {
     val batchTime = batch.batchTime.milliseconds
-    val formattedBatchTime = SparkUIUtils.formatDate(batch.batchTime.milliseconds)
+    val formattedBatchTime = UIUtils.formatBatchTime(batchTime, batchInterval)
     val eventCount = batch.numRecords
     val schedulingDelay = batch.schedulingDelay
     val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
@@ -79,7 +82,8 @@ private[ui] abstract class BatchTableBase(tableId: String) {
 
 private[ui] class ActiveBatchTable(
     runningBatches: Seq[BatchUIData],
-    waitingBatches: Seq[BatchUIData]) extends BatchTableBase("active-batches-table") {
+    waitingBatches: Seq[BatchUIData],
+    batchInterval: Long) extends BatchTableBase("active-batches-table", batchInterval) {
 
   override protected def columns: Seq[Node] = super.columns ++ <th>Status</th>
 
@@ -99,8 +103,8 @@ private[ui] class ActiveBatchTable(
   }
 }
 
-private[ui] class CompletedBatchTable(batches: Seq[BatchUIData])
-  extends BatchTableBase("completed-batches-table") {
+private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long)
+  extends BatchTableBase("completed-batches-table", batchInterval) {
 
   override protected def columns: Seq[Node] = super.columns ++
     <th>Total Delay
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 831f60e870f74feef2a84adc2a284f46cb680921..f75067669abe519935afef0bf9a8b2e7745f4802 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.streaming.ui
 
+import java.text.SimpleDateFormat
+import java.util.Date
 import javax.servlet.http.HttpServletRequest
 
 import scala.xml.{NodeSeq, Node, Text}
@@ -288,7 +290,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
     val batchTime = Option(request.getParameter("id")).map(id => Time(id.toLong)).getOrElse {
       throw new IllegalArgumentException(s"Missing id parameter")
     }
-    val formattedBatchTime = SparkUIUtils.formatDate(batchTime.milliseconds)
+    val formattedBatchTime =
+      UIUtils.formatBatchTime(batchTime.milliseconds, streamingListener.batchDuration)
 
     val batchUIData = streamingListener.getBatchUIData(batchTime).getOrElse {
       throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index efce8c58fb9626b18dce7bba0b3f2b3dd642012b..070564aa10633d2c912716f1d593b63a1272078d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -186,6 +186,8 @@ private[ui] class StreamingPage(parent: StreamingTab)
       <strong>
         {SparkUIUtils.formatDate(startTime)}
       </strong>
+      (<strong>{listener.numTotalCompletedBatches}</strong>
+      completed batches, <strong>{listener.numTotalReceivedRecords}</strong> records)
     </div>
     <br />
   }
@@ -199,9 +201,9 @@ private[ui] class StreamingPage(parent: StreamingTab)
    * @param times all time values that will be used in the graphs.
    */
   private def generateTimeMap(times: Seq[Long]): Seq[Node] = {
-    val dateFormat = new SimpleDateFormat("HH:mm:ss")
     val js = "var timeFormat = {};\n" + times.map { time =>
-      val formattedTime = dateFormat.format(new Date(time))
+      val formattedTime =
+        UIUtils.formatBatchTime(time, listener.batchDuration, showYYYYMMSS = false)
       s"timeFormat[$time] = '$formattedTime';"
     }.mkString("\n")
 
@@ -472,14 +474,14 @@ private[ui] class StreamingPage(parent: StreamingTab)
 
     val activeBatchesContent = {
       <h4 id="active">Active Batches ({runningBatches.size + waitingBatches.size})</h4> ++
-        new ActiveBatchTable(runningBatches, waitingBatches).toNodeSeq
+        new ActiveBatchTable(runningBatches, waitingBatches, listener.batchDuration).toNodeSeq
     }
 
     val completedBatchesContent = {
       <h4 id="completed">
         Completed Batches (last {completedBatches.size} out of {listener.numTotalCompletedBatches})
       </h4> ++
-        new CompletedBatchTable(completedBatches).toNodeSeq
+        new CompletedBatchTable(completedBatches, listener.batchDuration).toNodeSeq
     }
 
     activeBatchesContent ++ completedBatchesContent
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
index f153ee105a18e97991154430523e1af8a48d1596..86cfb1fa47370e4526a96fa9a6631a0ba999df81 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.streaming.ui
 
+import java.text.SimpleDateFormat
+import java.util.TimeZone
 import java.util.concurrent.TimeUnit
 
 private[streaming] object UIUtils {
@@ -62,7 +64,7 @@ private[streaming] object UIUtils {
    * Convert `milliseconds` to the specified `unit`. We cannot use `TimeUnit.convert` because it
    * will discard the fractional part.
    */
-  def convertToTimeUnit(milliseconds: Long, unit: TimeUnit): Double =  unit match {
+  def convertToTimeUnit(milliseconds: Long, unit: TimeUnit): Double = unit match {
     case TimeUnit.NANOSECONDS => milliseconds * 1000 * 1000
     case TimeUnit.MICROSECONDS => milliseconds * 1000
     case TimeUnit.MILLISECONDS => milliseconds
@@ -71,4 +73,55 @@ private[streaming] object UIUtils {
     case TimeUnit.HOURS => milliseconds / 1000.0 / 60.0 / 60.0
     case TimeUnit.DAYS => milliseconds / 1000.0 / 60.0 / 60.0 / 24.0
   }
+
+  // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
+  private val batchTimeFormat = new ThreadLocal[SimpleDateFormat]() {
+    override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+  }
+
+  private val batchTimeFormatWithMilliseconds = new ThreadLocal[SimpleDateFormat]() {
+    override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS")
+  }
+
+  /**
+   * If `batchInterval` is less than 1 second, format `batchTime` with milliseconds. Otherwise,
+   * format `batchTime` without milliseconds.
+   *
+   * @param batchTime the batch time to be formatted
+   * @param batchInterval the batch interval
+   * @param showYYYYMMSS if showing the `yyyy/MM/dd` part. If it's false, the return value wll be
+   *                     only `HH:mm:ss` or `HH:mm:ss.SSS` depending on `batchInterval`
+   * @param timezone only for test
+   */
+  def formatBatchTime(
+      batchTime: Long,
+      batchInterval: Long,
+      showYYYYMMSS: Boolean = true,
+      timezone: TimeZone = null): String = {
+    val oldTimezones =
+      (batchTimeFormat.get.getTimeZone, batchTimeFormatWithMilliseconds.get.getTimeZone)
+    if (timezone != null) {
+      batchTimeFormat.get.setTimeZone(timezone)
+      batchTimeFormatWithMilliseconds.get.setTimeZone(timezone)
+    }
+    try {
+      val formattedBatchTime =
+        if (batchInterval < 1000) {
+          batchTimeFormatWithMilliseconds.get.format(batchTime)
+        } else {
+          // If batchInterval >= 1 second, don't show milliseconds
+          batchTimeFormat.get.format(batchTime)
+        }
+      if (showYYYYMMSS) {
+        formattedBatchTime
+      } else {
+        formattedBatchTime.substring(formattedBatchTime.indexOf(' ') + 1)
+      }
+    } finally {
+      if (timezone != null) {
+        batchTimeFormat.get.setTimeZone(oldTimezones._1)
+        batchTimeFormatWithMilliseconds.get.setTimeZone(oldTimezones._2)
+      }
+    }
+  }
 }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala
index 6df1a63ab2e374c660f7141a6f689ea4be57aa25..e9ab917ab845cd6ff59cc5819c6f0b0b558673b9 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.streaming.ui
 
+import java.util.TimeZone
 import java.util.concurrent.TimeUnit
 
 import org.scalatest.FunSuite
@@ -64,4 +65,14 @@ class UIUtilsSuite extends FunSuite with Matchers{
     val convertedTime = UIUtils.convertToTimeUnit(milliseconds, unit)
     convertedTime should be (expectedTime +- 1E-6)
   }
+
+  test("formatBatchTime") {
+    val tzForTest = TimeZone.getTimeZone("America/Los_Angeles")
+    val batchTime = 1431637480452L // Thu May 14 14:04:40 PDT 2015
+    assert("2015/05/14 14:04:40" === UIUtils.formatBatchTime(batchTime, 1000, timezone = tzForTest))
+    assert("2015/05/14 14:04:40.452" ===
+      UIUtils.formatBatchTime(batchTime, 999, timezone = tzForTest))
+    assert("14:04:40" === UIUtils.formatBatchTime(batchTime, 1000, false, timezone = tzForTest))
+    assert("14:04:40.452" === UIUtils.formatBatchTime(batchTime, 999, false, timezone = tzForTest))
+  }
 }