From 365506fb038a76ff3810957f5bc5823f5f16af40 Mon Sep 17 00:00:00 2001
From: Tathagata Das <tathagata.das1565@gmail.com>
Date: Wed, 9 Jan 2013 14:29:25 -0800
Subject: [PATCH] Changed variable name form ***Time to ***Duration to keep
 things consistent.

---
 .../scala/spark/streaming/Checkpoint.scala    |  2 +-
 .../main/scala/spark/streaming/DStream.scala  | 90 +++++++++----------
 .../main/scala/spark/streaming/Duration.scala |  2 +-
 .../streaming/PairDStreamFunctions.scala      | 70 +++++++--------
 .../scala/spark/streaming/Scheduler.scala     |  4 +-
 .../spark/streaming/StreamingContext.scala    | 12 +--
 .../src/main/scala/spark/streaming/Time.scala | 22 ++---
 .../streaming/dstream/CoGroupedDStream.scala  |  4 +-
 .../streaming/dstream/FilteredDStream.scala   |  2 +-
 .../dstream/FlatMapValuedDStream.scala        |  2 +-
 .../streaming/dstream/FlatMappedDStream.scala |  2 +-
 .../streaming/dstream/ForEachDStream.scala    |  2 +-
 .../streaming/dstream/GlommedDStream.scala    |  2 +-
 .../streaming/dstream/InputDStream.scala      |  2 +-
 .../dstream/MapPartitionedDStream.scala       |  2 +-
 .../streaming/dstream/MapValuedDStream.scala  |  2 +-
 .../streaming/dstream/MappedDStream.scala     |  2 +-
 .../dstream/ReducedWindowedDStream.scala      | 34 +++----
 .../streaming/dstream/ShuffledDStream.scala   |  2 +-
 .../streaming/dstream/StateDStream.scala      |  4 +-
 .../dstream/TransformedDStream.scala          |  2 +-
 .../streaming/dstream/UnionDStream.scala      |  4 +-
 .../streaming/dstream/WindowedDStream.scala   | 24 ++---
 .../streaming/BasicOperationsSuite.scala      | 18 ++--
 .../scala/spark/streaming/TestSuiteBase.scala |  2 +-
 .../streaming/WindowOperationsSuite.scala     | 48 +++++-----
 26 files changed, 176 insertions(+), 186 deletions(-)

diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index a9c6e65d62..2f3adb39c2 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -17,7 +17,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
   val jars = ssc.sc.jars
   val graph = ssc.graph
   val checkpointDir = ssc.checkpointDir
-  val checkpointInterval: Duration = ssc.checkpointInterval
+  val checkpointDuration: Duration = ssc.checkpointDuration
 
   def validate() {
     assert(master != null, "Checkpoint.master is null")
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 7611598fde..c89fb7723e 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -47,7 +47,7 @@ abstract class DStream[T: ClassManifest] (
   // =======================================================================
 
   /** Time interval after which the DStream generates a RDD */
-  def slideTime: Duration
+  def slideDuration: Duration
 
   /** List of parent DStreams on which this DStream depends on */
   def dependencies: List[DStream[_]]
@@ -74,7 +74,7 @@ abstract class DStream[T: ClassManifest] (
 
   // Checkpoint details
   protected[streaming] val mustCheckpoint = false
-  protected[streaming] var checkpointInterval: Duration = null
+  protected[streaming] var checkpointDuration: Duration = null
   protected[streaming] var checkpointData = new DStreamCheckpointData(HashMap[Time, Any]())
 
   // Reference to whole DStream graph
@@ -114,7 +114,7 @@ abstract class DStream[T: ClassManifest] (
         "Cannot change checkpoint interval of an DStream after streaming context has started")
     }
     persist()
-    checkpointInterval = interval
+    checkpointDuration = interval
     this
   }
 
@@ -130,16 +130,16 @@ abstract class DStream[T: ClassManifest] (
     }
     zeroTime = time
 
-    // Set the checkpoint interval to be slideTime or 10 seconds, which ever is larger
-    if (mustCheckpoint && checkpointInterval == null) {
-      checkpointInterval = slideTime.max(Seconds(10))
-      logInfo("Checkpoint interval automatically set to " + checkpointInterval)
+    // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
+    if (mustCheckpoint && checkpointDuration == null) {
+      checkpointDuration = slideDuration.max(Seconds(10))
+      logInfo("Checkpoint interval automatically set to " + checkpointDuration)
     }
 
     // Set the minimum value of the rememberDuration if not already set
-    var minRememberDuration = slideTime
-    if (checkpointInterval != null && minRememberDuration <= checkpointInterval) {
-      minRememberDuration = checkpointInterval * 2  // times 2 just to be sure that the latest checkpoint is not forgetten
+    var minRememberDuration = slideDuration
+    if (checkpointDuration != null && minRememberDuration <= checkpointDuration) {
+      minRememberDuration = checkpointDuration * 2  // times 2 just to be sure that the latest checkpoint is not forgetten
     }
     if (rememberDuration == null || rememberDuration < minRememberDuration) {
       rememberDuration = minRememberDuration
@@ -153,37 +153,37 @@ abstract class DStream[T: ClassManifest] (
     assert(rememberDuration != null, "Remember duration is set to null")
 
     assert(
-      !mustCheckpoint || checkpointInterval != null,
+      !mustCheckpoint || checkpointDuration != null,
       "The checkpoint interval for " + this.getClass.getSimpleName + " has not been set. " +
         " Please use DStream.checkpoint() to set the interval."
     )
 
     assert(
-      checkpointInterval == null || checkpointInterval >= slideTime,
+      checkpointDuration == null || checkpointDuration >= slideDuration,
       "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
-        checkpointInterval + " which is lower than its slide time (" + slideTime + "). " +
-        "Please set it to at least " + slideTime + "."
+        checkpointDuration + " which is lower than its slide time (" + slideDuration + "). " +
+        "Please set it to at least " + slideDuration + "."
     )
 
     assert(
-      checkpointInterval == null || checkpointInterval.isMultipleOf(slideTime),
+      checkpointDuration == null || checkpointDuration.isMultipleOf(slideDuration),
       "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
-        checkpointInterval + " which not a multiple of its slide time (" + slideTime + "). " +
-        "Please set it to a multiple " + slideTime + "."
+        checkpointDuration + " which not a multiple of its slide time (" + slideDuration + "). " +
+        "Please set it to a multiple " + slideDuration + "."
     )
 
     assert(
-      checkpointInterval == null || storageLevel != StorageLevel.NONE,
+      checkpointDuration == null || storageLevel != StorageLevel.NONE,
       "" + this.getClass.getSimpleName + " has been marked for checkpointing but the storage " +
         "level has not been set to enable persisting. Please use DStream.persist() to set the " +
         "storage level to use memory for better checkpointing performance."
     )
 
     assert(
-      checkpointInterval == null || rememberDuration > checkpointInterval,
+      checkpointDuration == null || rememberDuration > checkpointDuration,
       "The remember duration for " + this.getClass.getSimpleName + " has been set to " +
         rememberDuration + " which is not more than the checkpoint interval (" +
-        checkpointInterval + "). Please set it to higher than " + checkpointInterval + "."
+        checkpointDuration + "). Please set it to higher than " + checkpointDuration + "."
     )
 
     val metadataCleanerDelay = spark.util.MetadataCleaner.getDelaySeconds
@@ -200,9 +200,9 @@ abstract class DStream[T: ClassManifest] (
 
     dependencies.foreach(_.validate())
 
-    logInfo("Slide time = " + slideTime)
+    logInfo("Slide time = " + slideDuration)
     logInfo("Storage level = " + storageLevel)
-    logInfo("Checkpoint interval = " + checkpointInterval)
+    logInfo("Checkpoint interval = " + checkpointDuration)
     logInfo("Remember duration = " + rememberDuration)
     logInfo("Initialized and validated " + this)
   }
@@ -232,11 +232,11 @@ abstract class DStream[T: ClassManifest] (
     dependencies.foreach(_.remember(parentRememberDuration))
   }
 
-  /** This method checks whether the 'time' is valid wrt slideTime for generating RDD */
+  /** This method checks whether the 'time' is valid wrt slideDuration for generating RDD */
   protected def isTimeValid(time: Time): Boolean = {
     if (!isInitialized) {
       throw new Exception (this + " has not been initialized")
-    } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideTime)) {
+    } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
       false
     } else {
       true
@@ -266,7 +266,7 @@ abstract class DStream[T: ClassManifest] (
                 newRDD.persist(storageLevel)
                 logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time)
               }
-              if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval)) {
+              if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
                 newRDD.checkpoint()
                 logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time)
               }
@@ -528,21 +528,21 @@ abstract class DStream[T: ClassManifest] (
   /**
    * Return a new DStream which is computed based on windowed batches of this DStream.
    * The new DStream generates RDDs with the same interval as this DStream.
-   * @param windowTime width of the window; must be a multiple of this DStream's interval.
+   * @param windowDuration width of the window; must be a multiple of this DStream's interval.
    * @return
    */
-  def window(windowTime: Duration): DStream[T] = window(windowTime, this.slideTime)
+  def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)
 
   /**
    * Return a new DStream which is computed based on windowed batches of this DStream.
-   * @param windowTime duration (i.e., width) of the window;
+   * @param windowDuration duration (i.e., width) of the window;
    *                   must be a multiple of this DStream's interval
-   * @param slideTime  sliding interval of the window (i.e., the interval after which
+   * @param slideDuration  sliding interval of the window (i.e., the interval after which
    *                   the new DStream will generate RDDs); must be a multiple of this
    *                   DStream's interval
    */
-  def window(windowTime: Duration, slideTime: Duration): DStream[T] = {
-    new WindowedDStream(this, windowTime, slideTime)
+  def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = {
+    new WindowedDStream(this, windowDuration, slideDuration)
   }
 
   /**
@@ -554,36 +554,36 @@ abstract class DStream[T: ClassManifest] (
 
   /**
    * Returns a new DStream in which each RDD has a single element generated by reducing all
-   * elements in a window over this DStream. windowTime and slideTime are as defined in the
-   * window() operation. This is equivalent to window(windowTime, slideTime).reduce(reduceFunc)
+   * elements in a window over this DStream. windowDuration and slideDuration are as defined in the
+   * window() operation. This is equivalent to window(windowDuration, slideDuration).reduce(reduceFunc)
    */
-  def reduceByWindow(reduceFunc: (T, T) => T, windowTime: Duration, slideTime: Duration): DStream[T] = {
-    this.window(windowTime, slideTime).reduce(reduceFunc)
+  def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T] = {
+    this.window(windowDuration, slideDuration).reduce(reduceFunc)
   }
 
   def reduceByWindow(
       reduceFunc: (T, T) => T,
       invReduceFunc: (T, T) => T,
-      windowTime: Duration,
-      slideTime: Duration
+      windowDuration: Duration,
+      slideDuration: Duration
     ): DStream[T] = {
       this.map(x => (1, x))
-          .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, 1)
+          .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
           .map(_._2)
   }
 
   /**
    * Returns a new DStream in which each RDD has a single element generated by counting the number
-   * of elements in a window over this DStream. windowTime and slideTime are as defined in the
-   * window() operation. This is equivalent to window(windowTime, slideTime).count()
+   * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
+   * window() operation. This is equivalent to window(windowDuration, slideDuration).count()
    */
-  def countByWindow(windowTime: Duration, slideTime: Duration): DStream[Int] = {
-    this.map(_ => 1).reduceByWindow(_ + _, _ - _, windowTime, slideTime)
+  def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Int] = {
+    this.map(_ => 1).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
   }
 
   /**
    * Returns a new DStream by unifying data of another DStream with this DStream.
-   * @param that Another DStream having the same interval (i.e., slideTime) as this DStream.
+   * @param that Another DStream having the same slideDuration as this DStream.
    */
   def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that))
 
@@ -599,13 +599,13 @@ abstract class DStream[T: ClassManifest] (
    */
   def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
     val rdds = new ArrayBuffer[RDD[T]]()
-    var time = toTime.floor(slideTime)
+    var time = toTime.floor(slideDuration)
     while (time >= zeroTime && time >= fromTime) {
       getOrCompute(time) match {
         case Some(rdd) => rdds += rdd
         case None => //throw new Exception("Could not get RDD for time " + time)
       }
-      time -= slideTime
+      time -= slideDuration
     }
     rdds.toSeq
   }
diff --git a/streaming/src/main/scala/spark/streaming/Duration.scala b/streaming/src/main/scala/spark/streaming/Duration.scala
index d2728d9dca..e4dc579a17 100644
--- a/streaming/src/main/scala/spark/streaming/Duration.scala
+++ b/streaming/src/main/scala/spark/streaming/Duration.scala
@@ -1,6 +1,6 @@
 package spark.streaming
 
-class Duration (private val millis: Long) {
+case class Duration (private val millis: Long) {
 
   def < (that: Duration): Boolean = (this.millis < that.millis)
 
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index dd64064138..482d01300d 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -21,14 +21,10 @@ extends Serializable {
  
   def ssc = self.ssc
 
-  def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = {
+  private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = {
     new HashPartitioner(numPartitions)
   }
 
-  /* ---------------------------------- */
-  /* DStream operations for key-value pairs */
-  /* ---------------------------------- */
-
   def groupByKey(): DStream[(K, Seq[V])] = {
     groupByKey(defaultPartitioner())
   }
@@ -69,59 +65,59 @@ extends Serializable {
     self.map(x => (x._1, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
   }
 
-  def groupByKeyAndWindow(windowTime: Duration, slideTime: Duration): DStream[(K, Seq[V])] = {
-    groupByKeyAndWindow(windowTime, slideTime, defaultPartitioner())
+  def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = {
+    groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner())
   }
 
   def groupByKeyAndWindow(
-      windowTime: Duration,
-      slideTime: Duration,
+      windowDuration: Duration,
+      slideDuration: Duration,
       numPartitions: Int
     ): DStream[(K, Seq[V])] = {
-    groupByKeyAndWindow(windowTime, slideTime, defaultPartitioner(numPartitions))
+    groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions))
   }
 
   def groupByKeyAndWindow(
-      windowTime: Duration,
-      slideTime: Duration,
+      windowDuration: Duration,
+      slideDuration: Duration,
       partitioner: Partitioner
     ): DStream[(K, Seq[V])] = {
-    self.window(windowTime, slideTime).groupByKey(partitioner)
+    self.window(windowDuration, slideDuration).groupByKey(partitioner)
   }
 
   def reduceByKeyAndWindow(
       reduceFunc: (V, V) => V,
-      windowTime: Duration
+      windowDuration: Duration
     ): DStream[(K, V)] = {
-    reduceByKeyAndWindow(reduceFunc, windowTime, self.slideTime, defaultPartitioner())
+    reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner())
   }
 
   def reduceByKeyAndWindow(
       reduceFunc: (V, V) => V, 
-      windowTime: Duration,
-      slideTime: Duration
+      windowDuration: Duration,
+      slideDuration: Duration
     ): DStream[(K, V)] = {
-    reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, defaultPartitioner())
+    reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner())
   }
 
   def reduceByKeyAndWindow(
       reduceFunc: (V, V) => V, 
-      windowTime: Duration,
-      slideTime: Duration,
+      windowDuration: Duration,
+      slideDuration: Duration,
       numPartitions: Int
     ): DStream[(K, V)] = {
-    reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, defaultPartitioner(numPartitions))
+    reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions))
   }
 
   def reduceByKeyAndWindow(
       reduceFunc: (V, V) => V,
-      windowTime: Duration,
-      slideTime: Duration,
+      windowDuration: Duration,
+      slideDuration: Duration,
       partitioner: Partitioner
     ): DStream[(K, V)] = {
     val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
     self.reduceByKey(cleanedReduceFunc, partitioner)
-        .window(windowTime, slideTime)
+        .window(windowDuration, slideDuration)
         .reduceByKey(cleanedReduceFunc, partitioner)
   }
 
@@ -134,51 +130,51 @@ extends Serializable {
   def reduceByKeyAndWindow(
       reduceFunc: (V, V) => V,
       invReduceFunc: (V, V) => V,
-      windowTime: Duration,
-      slideTime: Duration
+      windowDuration: Duration,
+      slideDuration: Duration
     ): DStream[(K, V)] = {
 
     reduceByKeyAndWindow(
-      reduceFunc, invReduceFunc, windowTime, slideTime, defaultPartitioner())
+      reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner())
   }
 
   def reduceByKeyAndWindow(
       reduceFunc: (V, V) => V,
       invReduceFunc: (V, V) => V,
-      windowTime: Duration,
-      slideTime: Duration,
+      windowDuration: Duration,
+      slideDuration: Duration,
       numPartitions: Int
     ): DStream[(K, V)] = {
 
     reduceByKeyAndWindow(
-      reduceFunc, invReduceFunc, windowTime, slideTime, defaultPartitioner(numPartitions))
+      reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions))
   }
 
   def reduceByKeyAndWindow(
       reduceFunc: (V, V) => V,
       invReduceFunc: (V, V) => V,
-      windowTime: Duration,
-      slideTime: Duration,
+      windowDuration: Duration,
+      slideDuration: Duration,
       partitioner: Partitioner
     ): DStream[(K, V)] = {
 
     val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
     val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
     new ReducedWindowedDStream[K, V](
-      self, cleanedReduceFunc, cleanedInvReduceFunc, windowTime, slideTime, partitioner)
+      self, cleanedReduceFunc, cleanedInvReduceFunc, windowDuration, slideDuration, partitioner)
   }
 
   def countByKeyAndWindow(
-      windowTime: Duration,
-      slideTime: Duration,
+      windowDuration: Duration,
+      slideDuration: Duration,
       numPartitions: Int = self.ssc.sc.defaultParallelism
     ): DStream[(K, Long)] = {
 
     self.map(x => (x._1, 1L)).reduceByKeyAndWindow(
       (x: Long, y: Long) => x + y,
       (x: Long, y: Long) => x - y,
-      windowTime,
-      slideTime,
+      windowDuration,
+      slideDuration,
       numPartitions
     )
   }
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index 10845e3a5e..c04ed37de8 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -14,7 +14,7 @@ class Scheduler(ssc: StreamingContext) extends Logging {
   val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
   val jobManager = new JobManager(ssc, concurrentJobs)
 
-  val checkpointWriter = if (ssc.checkpointInterval != null && ssc.checkpointDir != null) {
+  val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
     new CheckpointWriter(ssc.checkpointDir)
   } else {
     null
@@ -65,7 +65,7 @@ class Scheduler(ssc: StreamingContext) extends Logging {
   }
 
   private def doCheckpoint(time: Time) {
-    if (ssc.checkpointInterval != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointInterval)) {
+    if (ssc.checkpointDuration != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
       val startTime = System.currentTimeMillis()
       ssc.graph.updateCheckpointData(time)
       checkpointWriter.write(new Checkpoint(ssc, time))
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index ee8314df3f..14500bdcb1 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -96,7 +96,7 @@ class StreamingContext private (
     }
   }
 
-  protected[streaming] var checkpointInterval: Duration = if (isCheckpointPresent) cp_.checkpointInterval else null
+  protected[streaming] var checkpointDuration: Duration = if (isCheckpointPresent) cp_.checkpointDuration else null
   protected[streaming] var receiverJobThread: Thread = null
   protected[streaming] var scheduler: Scheduler = null
 
@@ -121,10 +121,10 @@ class StreamingContext private (
     if (directory != null) {
       sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory))
       checkpointDir = directory
-      checkpointInterval = interval
+      checkpointDuration = interval
     } else {
       checkpointDir = null
-      checkpointInterval = null
+      checkpointDuration = null
     }
   }
 
@@ -327,7 +327,7 @@ class StreamingContext private (
     graph.validate()
 
     assert(
-      checkpointDir == null || checkpointInterval != null,
+      checkpointDir == null || checkpointDuration != null,
       "Checkpoint directory has been set, but the graph checkpointing interval has " +
         "not been set. Please use StreamingContext.checkpoint() to set the interval."
     )
@@ -337,8 +337,8 @@ class StreamingContext private (
    * Starts the execution of the streams.
    */
   def start() {
-    if (checkpointDir != null && checkpointInterval == null && graph != null) {
-      checkpointInterval = graph.batchDuration
+    if (checkpointDir != null && checkpointDuration == null && graph != null) {
+      checkpointDuration = graph.batchDuration
     }
 
     validate()
diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala
index 069df82e52..5daeb761dd 100644
--- a/streaming/src/main/scala/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/spark/streaming/Time.scala
@@ -1,14 +1,15 @@
 package spark.streaming
 
 /**
- * This is a simple class that represents time. Internally, it represents time as UTC.
- * The recommended way to create instances of Time is to use helper objects
- * [[spark.streaming.Milliseconds]], [[spark.streaming.Seconds]], and [[spark.streaming.Minutes]].
- * @param millis Time in UTC.
+ * This is a simple class that represents an absolute instant of time.
+ * Internally, it represents time as the difference, measured in milliseconds, between the current
+ * time and midnight, January 1, 1970 UTC. This is the same format as what is returned by
+ * System.currentTimeMillis.
  */
+case class Time(private val millis: Long) {
+
+  def milliseconds: Long = millis
 
-class Time(private val millis: Long) {
-  
   def < (that: Time): Boolean = (this.millis < that.millis)
 
   def <= (that: Time): Boolean = (this.millis <= that.millis)
@@ -38,11 +39,4 @@ class Time(private val millis: Long) {
 
   override def toString: String = (millis.toString + " ms")
 
-  def milliseconds: Long = millis
-}
-
-/*private[streaming] object Time {
-  implicit def toTime(long: Long) = Time(long)
-}
-*/
-
+}
\ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
index ca178fd384..ddb1bf6b28 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
@@ -18,13 +18,13 @@ class CoGroupedDStream[K : ClassManifest](
     throw new IllegalArgumentException("Array of parents have different StreamingContexts")
   }
 
-  if (parents.map(_.slideTime).distinct.size > 1) {
+  if (parents.map(_.slideDuration).distinct.size > 1) {
     throw new IllegalArgumentException("Array of parents have different slide times")
   }
 
   override def dependencies = parents.toList
 
-  override def slideTime: Duration = parents.head.slideTime
+  override def slideDuration: Duration = parents.head.slideDuration
 
   override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = {
     val part = partitioner
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala
index 76b9e58029..e993164f99 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala
@@ -11,7 +11,7 @@ class FilteredDStream[T: ClassManifest](
 
   override def dependencies = List(parent)
 
-  override def slideTime: Duration = parent.slideTime
+  override def slideDuration: Duration = parent.slideDuration
 
   override def compute(validTime: Time): Option[RDD[T]] = {
     parent.getOrCompute(validTime).map(_.filter(filterFunc))
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala
index 28e9a456ac..cabd34f5f2 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala
@@ -12,7 +12,7 @@ class FlatMapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]
 
   override def dependencies = List(parent)
 
-  override def slideTime: Duration = parent.slideTime
+  override def slideDuration: Duration = parent.slideDuration
 
   override def compute(validTime: Time): Option[RDD[(K, U)]] = {
     parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc))
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala
index ef305b66f1..a69af60589 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala
@@ -11,7 +11,7 @@ class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
 
   override def dependencies = List(parent)
 
-  override def slideTime: Duration = parent.slideTime
+  override def slideDuration: Duration = parent.slideDuration
 
   override def compute(validTime: Time): Option[RDD[U]] = {
     parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala
index f8af0a38a7..ee69ea5177 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala
@@ -11,7 +11,7 @@ class ForEachDStream[T: ClassManifest] (
 
   override def dependencies = List(parent)
 
-  override def slideTime: Duration = parent.slideTime
+  override def slideDuration: Duration = parent.slideDuration
 
   override def compute(validTime: Time): Option[RDD[Unit]] = None
 
diff --git a/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala
index 19cccea735..b589cbd4d5 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala
@@ -9,7 +9,7 @@ class GlommedDStream[T: ClassManifest](parent: DStream[T])
 
   override def dependencies = List(parent)
 
-  override def slideTime: Duration = parent.slideTime
+  override def slideDuration: Duration = parent.slideDuration
 
   override def compute(validTime: Time): Option[RDD[Array[T]]] = {
     parent.getOrCompute(validTime).map(_.glom())
diff --git a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
index 50f0f45796..980ca5177e 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
@@ -7,7 +7,7 @@ abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContex
 
   override def dependencies = List()
 
-  override def slideTime: Duration = {
+  override def slideDuration: Duration = {
     if (ssc == null) throw new Exception("ssc is null")
     if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")
     ssc.graph.batchDuration
diff --git a/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala
index e9ca668aa6..848afecfad 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala
@@ -12,7 +12,7 @@ class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
 
   override def dependencies = List(parent)
 
-  override def slideTime: Duration = parent.slideTime
+  override def slideDuration: Duration = parent.slideDuration
 
   override def compute(validTime: Time): Option[RDD[U]] = {
     parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
diff --git a/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala
index ebc7d0698b..6055aa6a05 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala
@@ -12,7 +12,7 @@ class MapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
 
   override def dependencies = List(parent)
 
-  override def slideTime: Duration = parent.slideTime
+  override def slideDuration: Duration = parent.slideDuration
 
   override def compute(validTime: Time): Option[RDD[(K, U)]] = {
     parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc))
diff --git a/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala
index 3af8e7ab88..20818a0cab 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala
@@ -11,7 +11,7 @@ class MappedDStream[T: ClassManifest, U: ClassManifest] (
 
   override def dependencies = List(parent)
 
-  override def slideTime: Duration = parent.slideTime
+  override def slideDuration: Duration = parent.slideDuration
 
   override def compute(validTime: Time): Option[RDD[U]] = {
     parent.getOrCompute(validTime).map(_.map[U](mapFunc))
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
index a685a778ce..733d5c4a25 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -16,19 +16,19 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
     parent: DStream[(K, V)],
     reduceFunc: (V, V) => V,
     invReduceFunc: (V, V) => V, 
-    _windowTime: Duration,
-    _slideTime: Duration,
+    _windowDuration: Duration,
+    _slideDuration: Duration,
     partitioner: Partitioner
   ) extends DStream[(K,V)](parent.ssc) {
 
-  assert(_windowTime.isMultipleOf(parent.slideTime),
-    "The window duration of ReducedWindowedDStream (" + _slideTime + ") " +
-      "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")"
+  assert(_windowDuration.isMultipleOf(parent.slideDuration),
+    "The window duration of ReducedWindowedDStream (" + _slideDuration + ") " +
+      "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")"
   )
 
-  assert(_slideTime.isMultipleOf(parent.slideTime),
-    "The slide duration of ReducedWindowedDStream (" + _slideTime + ") " +
-      "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")"
+  assert(_slideDuration.isMultipleOf(parent.slideDuration),
+    "The slide duration of ReducedWindowedDStream (" + _slideDuration + ") " +
+      "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")"
   )
 
   // Reduce each batch of data using reduceByKey which will be further reduced by window 
@@ -39,15 +39,15 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
   super.persist(StorageLevel.MEMORY_ONLY_SER)
   reducedStream.persist(StorageLevel.MEMORY_ONLY_SER)
 
-  def windowTime: Duration =  _windowTime
+  def windowDuration: Duration =  _windowDuration
 
   override def dependencies = List(reducedStream)
 
-  override def slideTime: Duration = _slideTime
+  override def slideDuration: Duration = _slideDuration
 
   override val mustCheckpoint = true
 
-  override def parentRememberDuration: Duration = rememberDuration + windowTime
+  override def parentRememberDuration: Duration = rememberDuration + windowDuration
 
   override def persist(storageLevel: StorageLevel): DStream[(K,V)] = {
     super.persist(storageLevel)
@@ -66,11 +66,11 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
     val invReduceF = invReduceFunc
 
     val currentTime = validTime
-    val currentWindow = new Interval(currentTime - windowTime + parent.slideTime, currentTime)
-    val previousWindow = currentWindow - slideTime
+    val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration, currentTime)
+    val previousWindow = currentWindow - slideDuration
 
-    logDebug("Window time = " + windowTime)
-    logDebug("Slide time = " + slideTime)
+    logDebug("Window time = " + windowDuration)
+    logDebug("Slide time = " + slideDuration)
     logDebug("ZeroTime = " + zeroTime)
     logDebug("Current window = " + currentWindow)
     logDebug("Previous window = " + previousWindow)
@@ -87,11 +87,11 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
     //
 
     // Get the RDDs of the reduced values in "old time steps"
-    val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideTime)
+    val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration)
     logDebug("# old RDDs = " + oldRDDs.size)
 
     // Get the RDDs of the reduced values in "new time steps"
-    val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideTime, currentWindow.endTime)
+    val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)
     logDebug("# new RDDs = " + newRDDs.size)
 
     // Get the RDD of the reduced value of the previous window
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala
index 7612804b96..1f9548bfb8 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala
@@ -15,7 +15,7 @@ class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
 
   override def dependencies = List(parent)
 
-  override def slideTime: Duration = parent.slideTime
+  override def slideDuration: Duration = parent.slideDuration
 
   override def compute(validTime: Time): Option[RDD[(K,C)]] = {
     parent.getOrCompute(validTime) match {
diff --git a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
index ce4f486825..a1ec2f5454 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
@@ -18,14 +18,14 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife
 
   override def dependencies = List(parent)
 
-  override def slideTime: Duration = parent.slideTime
+  override def slideDuration: Duration = parent.slideDuration
 
   override val mustCheckpoint = true
 
   override def compute(validTime: Time): Option[RDD[(K, S)]] = {
 
     // Try to get the previous state RDD
-    getOrCompute(validTime - slideTime) match {
+    getOrCompute(validTime - slideDuration) match {
 
       case Some(prevStateRDD) => {    // If previous state RDD exists
 
diff --git a/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala
index 5a2c5bc0f0..99660d9dee 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala
@@ -11,7 +11,7 @@ class TransformedDStream[T: ClassManifest, U: ClassManifest] (
 
   override def dependencies = List(parent)
 
-  override def slideTime: Duration = parent.slideTime
+  override def slideDuration: Duration = parent.slideDuration
 
   override def compute(validTime: Time): Option[RDD[U]] = {
     parent.getOrCompute(validTime).map(transformFunc(_, validTime))
diff --git a/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala
index 224a19842b..00bad5da34 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala
@@ -17,13 +17,13 @@ class UnionDStream[T: ClassManifest](parents: Array[DStream[T]])
     throw new IllegalArgumentException("Array of parents have different StreamingContexts")
   }
 
-  if (parents.map(_.slideTime).distinct.size > 1) {
+  if (parents.map(_.slideDuration).distinct.size > 1) {
     throw new IllegalArgumentException("Array of parents have different slide times")
   }
 
   override def dependencies = parents.toList
 
-  override def slideTime: Duration = parents.head.slideTime
+  override def slideDuration: Duration = parents.head.slideDuration
 
   override def compute(validTime: Time): Option[RDD[T]] = {
     val rdds = new ArrayBuffer[RDD[T]]()
diff --git a/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala
index 45689b25ce..cbf0c88108 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala
@@ -8,30 +8,30 @@ import spark.streaming.{Duration, Interval, Time, DStream}
 private[streaming]
 class WindowedDStream[T: ClassManifest](
     parent: DStream[T],
-    _windowTime: Duration,
-    _slideTime: Duration)
+    _windowDuration: Duration,
+    _slideDuration: Duration)
   extends DStream[T](parent.ssc) {
 
-  if (!_windowTime.isMultipleOf(parent.slideTime))
-    throw new Exception("The window duration of WindowedDStream (" + _slideTime + ") " +
-    "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")")
+  if (!_windowDuration.isMultipleOf(parent.slideDuration))
+    throw new Exception("The window duration of WindowedDStream (" + _slideDuration + ") " +
+    "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
 
-  if (!_slideTime.isMultipleOf(parent.slideTime))
-    throw new Exception("The slide duration of WindowedDStream (" + _slideTime + ") " +
-    "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")")
+  if (!_slideDuration.isMultipleOf(parent.slideDuration))
+    throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " +
+    "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
 
   parent.persist(StorageLevel.MEMORY_ONLY_SER)
 
-  def windowTime: Duration =  _windowTime
+  def windowDuration: Duration =  _windowDuration
 
   override def dependencies = List(parent)
 
-  override def slideTime: Duration = _slideTime
+  override def slideDuration: Duration = _slideDuration
 
-  override def parentRememberDuration: Duration = rememberDuration + windowTime
+  override def parentRememberDuration: Duration = rememberDuration + windowDuration
 
   override def compute(validTime: Time): Option[RDD[T]] = {
-    val currentWindow = new Interval(validTime - windowTime + parent.slideTime, validTime)
+    val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
     Some(new UnionRDD(ssc.sc, parent.slice(currentWindow)))
   }
 }
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index dc38ef4912..f9e03c607d 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -196,18 +196,18 @@ class BasicOperationsSuite extends TestSuiteBase {
     // MappedStream should remember till 7 seconds:    10, 9, 8, 7, 6, 5, 4, 3, 2
 
     // WindowedStream2
-    assert(windowedStream2.generatedRDDs.contains(Seconds(10)))
-    assert(windowedStream2.generatedRDDs.contains(Seconds(8)))
-    assert(!windowedStream2.generatedRDDs.contains(Seconds(6)))
+    assert(windowedStream2.generatedRDDs.contains(Time(10000)))
+    assert(windowedStream2.generatedRDDs.contains(Time(8000)))
+    assert(!windowedStream2.generatedRDDs.contains(Time(6000)))
 
     // WindowedStream1
-    assert(windowedStream1.generatedRDDs.contains(Seconds(10)))
-    assert(windowedStream1.generatedRDDs.contains(Seconds(4)))
-    assert(!windowedStream1.generatedRDDs.contains(Seconds(3)))
+    assert(windowedStream1.generatedRDDs.contains(Time(10000)))
+    assert(windowedStream1.generatedRDDs.contains(Time(4000)))
+    assert(!windowedStream1.generatedRDDs.contains(Time(3000)))
 
     // MappedStream
-    assert(mappedStream.generatedRDDs.contains(Seconds(10)))
-    assert(mappedStream.generatedRDDs.contains(Seconds(2)))
-    assert(!mappedStream.generatedRDDs.contains(Seconds(1)))
+    assert(mappedStream.generatedRDDs.contains(Time(10000)))
+    assert(mappedStream.generatedRDDs.contains(Time(2000)))
+    assert(!mappedStream.generatedRDDs.contains(Time(1000)))
   }
 }
diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
index 28bdd53c3c..a76f61d4ad 100644
--- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
@@ -26,7 +26,7 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[
 
   def compute(validTime: Time): Option[RDD[T]] = {
     logInfo("Computing RDD for time " + validTime)
-    val index = ((validTime - zeroTime) / slideTime - 1).toInt
+    val index = ((validTime - zeroTime) / slideDuration - 1).toInt
     val selectedInput = if (index < input.size) input(index) else Seq[T]()
     val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
     logInfo("Created RDD " + rdd.id + " with " + selectedInput)
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index 4bc5229465..fa117cfcf0 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -207,11 +207,11 @@ class WindowOperationsSuite extends TestSuiteBase {
   test("groupByKeyAndWindow") {
     val input = bigInput
     val expectedOutput = bigGroupByOutput.map(_.map(x => (x._1, x._2.toSet)))
-    val windowTime = Seconds(2)
-    val slideTime = Seconds(1)
-    val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
+    val windowDuration = Seconds(2)
+    val slideDuration = Seconds(1)
+    val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
     val operation = (s: DStream[(String, Int)]) => {
-      s.groupByKeyAndWindow(windowTime, slideTime)
+      s.groupByKeyAndWindow(windowDuration, slideDuration)
        .map(x => (x._1, x._2.toSet))
        .persist()
     }
@@ -221,21 +221,21 @@ class WindowOperationsSuite extends TestSuiteBase {
   test("countByWindow") {
     val input = Seq(Seq(1), Seq(1), Seq(1, 2), Seq(0), Seq(), Seq() )
     val expectedOutput = Seq( Seq(1), Seq(2), Seq(3), Seq(3), Seq(1), Seq(0))
-    val windowTime = Seconds(2)
-    val slideTime = Seconds(1)
-    val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
-    val operation = (s: DStream[Int]) => s.countByWindow(windowTime, slideTime)
+    val windowDuration = Seconds(2)
+    val slideDuration = Seconds(1)
+    val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
+    val operation = (s: DStream[Int]) => s.countByWindow(windowDuration, slideDuration)
     testOperation(input, operation, expectedOutput, numBatches, true)
   }
 
   test("countByKeyAndWindow") {
     val input = Seq(Seq(("a", 1)), Seq(("b", 1), ("b", 2)), Seq(("a", 10), ("b", 20)))
     val expectedOutput = Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 2)), Seq(("a", 1), ("b", 3)))
-    val windowTime = Seconds(2)
-    val slideTime = Seconds(1)
-    val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
+    val windowDuration = Seconds(2)
+    val slideDuration = Seconds(1)
+    val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
     val operation = (s: DStream[(String, Int)]) => {
-      s.countByKeyAndWindow(windowTime, slideTime).map(x => (x._1, x._2.toInt))
+      s.countByKeyAndWindow(windowDuration, slideDuration).map(x => (x._1, x._2.toInt))
     }
     testOperation(input, operation, expectedOutput, numBatches, true)
   }
@@ -247,12 +247,12 @@ class WindowOperationsSuite extends TestSuiteBase {
     name: String,
     input: Seq[Seq[Int]],
     expectedOutput: Seq[Seq[Int]],
-    windowTime: Time = Seconds(2),
-    slideTime: Time = Seconds(1)
+    windowDuration: Duration = Seconds(2),
+    slideDuration: Duration = Seconds(1)
     ) {
     test("window - " + name) {
-      val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
-      val operation = (s: DStream[Int]) => s.window(windowTime, slideTime)
+      val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
+      val operation = (s: DStream[Int]) => s.window(windowDuration, slideDuration)
       testOperation(input, operation, expectedOutput, numBatches, true)
     }
   }
@@ -261,13 +261,13 @@ class WindowOperationsSuite extends TestSuiteBase {
     name: String,
     input: Seq[Seq[(String, Int)]],
     expectedOutput: Seq[Seq[(String, Int)]],
-    windowTime: Time = Seconds(2),
-    slideTime: Time = Seconds(1)
+    windowDuration: Duration = Seconds(2),
+    slideDuration: Duration = Seconds(1)
     ) {
     test("reduceByKeyAndWindow - " + name) {
-      val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
+      val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
       val operation = (s: DStream[(String, Int)]) => {
-        s.reduceByKeyAndWindow(_ + _, windowTime, slideTime).persist()
+        s.reduceByKeyAndWindow(_ + _, windowDuration, slideDuration).persist()
       }
       testOperation(input, operation, expectedOutput, numBatches, true)
     }
@@ -277,13 +277,13 @@ class WindowOperationsSuite extends TestSuiteBase {
     name: String,
     input: Seq[Seq[(String, Int)]],
     expectedOutput: Seq[Seq[(String, Int)]],
-    windowTime: Time = Seconds(2),
-    slideTime: Time = Seconds(1)
+    windowDuration: Duration = Seconds(2),
+    slideDuration: Duration = Seconds(1)
   ) {
     test("reduceByKeyAndWindowInv - " + name) {
-      val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
+      val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
       val operation = (s: DStream[(String, Int)]) => {
-        s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime)
+        s.reduceByKeyAndWindow(_ + _, _ - _, windowDuration, slideDuration)
          .persist()
          .checkpoint(Seconds(100)) // Large value to avoid effect of RDD checkpointing
       }
-- 
GitLab