diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index b9eb7f8ec4a00fa60ed7dac823f08a2085b6f1e4..7405c8b22ebc9f7893b1c3c83d9ae4a583a1e24d 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -6,6 +6,8 @@ import org.apache.hadoop.fs.{FileUtil, Path} import org.apache.hadoop.conf.Configuration import java.io._ +import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} +import java.util.concurrent.Executors private[streaming] @@ -38,32 +40,50 @@ class CheckpointWriter(checkpointDir: String) extends Logging { val conf = new Configuration() var fs = file.getFileSystem(conf) val maxAttempts = 3 + val executor = Executors.newFixedThreadPool(1) - def write(checkpoint: Checkpoint) { - // TODO: maybe do this in a different thread from the main stream execution thread - var attempts = 0 - while (attempts < maxAttempts) { - attempts += 1 - try { - logDebug("Saving checkpoint for time " + checkpoint.checkpointTime + " to file '" + file + "'") - if (fs.exists(file)) { - val bkFile = new Path(file.getParent, file.getName + ".bk") - FileUtil.copy(fs, file, fs, bkFile, true, true, conf) - logDebug("Moved existing checkpoint file to " + bkFile) + class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable { + def run() { + var attempts = 0 + val startTime = System.currentTimeMillis() + while (attempts < maxAttempts) { + attempts += 1 + try { + logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") + if (fs.exists(file)) { + val bkFile = new Path(file.getParent, file.getName + ".bk") + FileUtil.copy(fs, file, fs, bkFile, true, true, conf) + logDebug("Moved existing checkpoint file to " + bkFile) + } + val fos = fs.create(file) + fos.write(bytes) + fos.close() + fos.close() + val finishTime = System.currentTimeMillis(); + logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file + + "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds") + return + } catch { + case ioe: IOException => + logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe) } - val fos = fs.create(file) - val oos = new ObjectOutputStream(fos) - oos.writeObject(checkpoint) - oos.close() - logInfo("Checkpoint for time " + checkpoint.checkpointTime + " saved to file '" + file + "'") - fos.close() - return - } catch { - case ioe: IOException => - logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe) } + logError("Could not write checkpoint for time " + checkpointTime + " to file '" + file + "'") } - logError("Could not write checkpoint for time " + checkpoint.checkpointTime + " to file '" + file + "'") + } + + def write(checkpoint: Checkpoint) { + val bos = new ByteArrayOutputStream() + val zos = new LZFOutputStream(bos) + val oos = new ObjectOutputStream(zos) + oos.writeObject(checkpoint) + oos.close() + bos.close() + executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray)) + } + + def stop() { + executor.shutdown() } } @@ -85,7 +105,8 @@ object CheckpointReader extends Logging { // of ObjectInputStream is used to explicitly use the current thread's default class // loader to find and load classes. This is a well know Java issue and has popped up // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627) - val ois = new ObjectInputStreamWithLoader(fis, Thread.currentThread().getContextClassLoader) + val zis = new LZFInputStream(fis) + val ois = new ObjectInputStreamWithLoader(zis, Thread.currentThread().getContextClassLoader) val cp = ois.readObject.asInstanceOf[Checkpoint] ois.close() fs.close() diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index ce42b742d77e447605ac2de3cbd2ab4e7a27e5c8..84e4b5bedb8df7cbd3370386f61e903c357f6f86 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -238,13 +238,15 @@ abstract class DStream[T: ClassManifest] ( dependencies.foreach(_.remember(parentRememberDuration)) } - /** This method checks whether the 'time' is valid wrt slideDuration for generating RDD */ + /** Checks whether the 'time' is valid wrt slideDuration for generating RDD */ protected def isTimeValid(time: Time): Boolean = { if (!isInitialized) { throw new Exception (this + " has not been initialized") } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) { + logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime)) false } else { + logInfo("Time " + time + " is valid") true } } @@ -627,16 +629,21 @@ abstract class DStream[T: ClassManifest] ( * Return all the RDDs between 'fromTime' to 'toTime' (both included) */ def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = { - val rdds = new ArrayBuffer[RDD[T]]() - var time = toTime.floor(slideDuration) - while (time >= zeroTime && time >= fromTime) { - getOrCompute(time) match { - case Some(rdd) => rdds += rdd - case None => //throw new Exception("Could not get RDD for time " + time) - } - time -= slideDuration + if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) { + logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") + } + if (!(toTime - zeroTime).isMultipleOf(slideDuration)) { + logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") } - rdds.toSeq + val alignedToTime = toTime.floor(slideDuration) + val alignedFromTime = fromTime.floor(slideDuration) + + logInfo("Slicing from " + fromTime + " to " + toTime + + " (aligned to " + alignedFromTime + " and " + alignedToTime + ")") + + alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => { + if (time >= zeroTime) getOrCompute(time) else None + }) } /** diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala index 22d9e24f05b99a39239e25ef84053e2f6a0fce05..adb7f3a24d25f6fcbd1453c2e75b56b9a22d10b4 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala @@ -86,10 +86,12 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { def getOutputStreams() = this.synchronized { outputStreams.toArray } - def generateRDDs(time: Time): Seq[Job] = { + def generateJobs(time: Time): Seq[Job] = { this.synchronized { - logInfo("Generating RDDs for time " + time) - outputStreams.flatMap(outputStream => outputStream.generateJob(time)) + logInfo("Generating jobs for time " + time) + val jobs = outputStreams.flatMap(outputStream => outputStream.generateJob(time)) + logInfo("Generated " + jobs.length + " jobs for time " + time) + jobs } } @@ -97,18 +99,23 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { this.synchronized { logInfo("Clearing old metadata for time " + time) outputStreams.foreach(_.clearOldMetadata(time)) + logInfo("Cleared old metadata for time " + time) } } def updateCheckpointData(time: Time) { this.synchronized { + logInfo("Updating checkpoint data for time " + time) outputStreams.foreach(_.updateCheckpointData(time)) + logInfo("Updated checkpoint data for time " + time) } } def restoreCheckpointData() { this.synchronized { + logInfo("Restoring checkpoint data") outputStreams.foreach(_.restoreCheckpointData()) + logInfo("Restored checkpoint data") } } diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala index 649494ff4a8d640972efefc39fc85fe699f14f42..7696c4a592bf6c3c9f5885da1e779f39f7ff3101 100644 --- a/streaming/src/main/scala/spark/streaming/JobManager.scala +++ b/streaming/src/main/scala/spark/streaming/JobManager.scala @@ -43,20 +43,24 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging { } private def clearJob(job: Job) { + var timeCleared = false + val time = job.time jobs.synchronized { - val time = job.time val jobsOfTime = jobs.get(time) if (jobsOfTime.isDefined) { jobsOfTime.get -= job if (jobsOfTime.get.isEmpty) { - ssc.scheduler.clearOldMetadata(time) jobs -= time + timeCleared = true } } else { throw new Exception("Job finished for time " + job.time + " but time does not exist in jobs") } } + if (timeCleared) { + ssc.scheduler.clearOldMetadata(time) + } } def getPendingTimes(): Array[Time] = { diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index 57d494da8378eacde4c81bbcda5cc800fc45ff53..1c4b22a8981c8db8ecb2a28fcfae2e301d000627 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -20,8 +20,9 @@ class Scheduler(ssc: StreamingContext) extends Logging { val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock") val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock] val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, - longTime => generateRDDs(new Time(longTime))) + longTime => generateJobs(new Time(longTime))) val graph = ssc.graph + var latestTime: Time = null def start() = synchronized { if (ssc.isCheckpointPresent) { @@ -35,6 +36,7 @@ class Scheduler(ssc: StreamingContext) extends Logging { def stop() = synchronized { timer.stop() jobManager.stop() + if (checkpointWriter != null) checkpointWriter.stop() ssc.graph.stop() logInfo("Scheduler stopped") } @@ -73,35 +75,38 @@ class Scheduler(ssc: StreamingContext) extends Logging { val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering) logInfo("Batches to reschedule: " + timesToReschedule.mkString(", ")) timesToReschedule.foreach(time => - graph.generateRDDs(time).foreach(jobManager.runJob) + graph.generateJobs(time).foreach(jobManager.runJob) ) // Restart the timer timer.start(restartTime.milliseconds) - logInfo("Scheduler's timer restarted") + logInfo("Scheduler's timer restarted at " + restartTime) } - /** Generates the RDDs, clears old metadata and does checkpoint for the given time */ - def generateRDDs(time: Time) { + /** Generate jobs and perform checkpoint for the given `time`. */ + def generateJobs(time: Time) { SparkEnv.set(ssc.env) logInfo("\n-----------------------------------------------------\n") - graph.generateRDDs(time).foreach(jobManager.runJob) + graph.generateJobs(time).foreach(jobManager.runJob) + latestTime = time doCheckpoint(time) } - + /** + * Clear old metadata assuming jobs of `time` have finished processing. + * And also perform checkpoint. + */ def clearOldMetadata(time: Time) { ssc.graph.clearOldMetadata(time) + doCheckpoint(time) } - def doCheckpoint(time: Time) { + /** Perform checkpoint for the give `time`. */ + def doCheckpoint(time: Time) = synchronized { if (ssc.checkpointDuration != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { logInfo("Checkpointing graph for time " + time) - val startTime = System.currentTimeMillis() ssc.graph.updateCheckpointData(time) checkpointWriter.write(new Checkpoint(ssc, time)) - val stopTime = System.currentTimeMillis() - logInfo("Checkpointing the graph took " + (stopTime - startTime) + " ms") } } } diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 9be9d884be66c88c034f22f8a34e3f1455a1b12e..d1407b7869c03b916f694aef4e8d531fa681121c 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -119,18 +119,15 @@ class StreamingContext private ( /** * Set the context to periodically checkpoint the DStream operations for master - * fault-tolerance. By default, the graph will be checkpointed every batch interval. + * fault-tolerance. The graph will be checkpointed every batch interval. * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored - * @param interval checkpoint interval */ - def checkpoint(directory: String, interval: Duration = null) { + def checkpoint(directory: String) { if (directory != null) { sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory)) checkpointDir = directory - checkpointDuration = interval } else { checkpointDir = null - checkpointDuration = null } } diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala index 8201e84a203d71d18b5c25d8f74911aa39dd3ed7..f14decf08ba8f98bc4f59e062b0b8ea17d7ade1e 100644 --- a/streaming/src/main/scala/spark/streaming/Time.scala +++ b/streaming/src/main/scala/spark/streaming/Time.scala @@ -38,15 +38,14 @@ case class Time(private val millis: Long) { def max(that: Time): Time = if (this > that) this else that def until(that: Time, interval: Duration): Seq[Time] = { - assert(that > this, "Cannot create sequence as " + that + " not more than " + this) - assert( - (that - this).isMultipleOf(interval), - "Cannot create sequence as gap between " + that + " and " + - this + " is not multiple of " + interval - ) (this.milliseconds) until (that.milliseconds) by (interval.milliseconds) map (new Time(_)) } + def to(that: Time, interval: Duration): Seq[Time] = { + (this.milliseconds) to (that.milliseconds) by (interval.milliseconds) map (new Time(_)) + } + + override def toString: String = (millis.toString + " ms") } diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index 5bbf2b084f32f2195718bb63e845ba9661f1289b..03933aae93680f5170859e0780cf3ab625b9d96c 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -314,12 +314,11 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Sets the context to periodically checkpoint the DStream operations for master - * fault-tolerance. By default, the graph will be checkpointed every batch interval. + * fault-tolerance. The graph will be checkpointed every batch interval. * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored - * @param interval checkpoint interval */ - def checkpoint(directory: String, interval: Duration = null) { - ssc.checkpoint(directory, interval) + def checkpoint(directory: String) { + ssc.checkpoint(directory) } /** diff --git a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala index 980ca5177eb1d20a5b8b8a85155e914ff94e1511..a4db44a608e08ff8c22c16b5c2c147bf4cc5085f 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala @@ -1,10 +1,42 @@ package spark.streaming.dstream -import spark.streaming.{Duration, StreamingContext, DStream} +import spark.streaming.{Time, Duration, StreamingContext, DStream} +/** + * This is the abstract base class for all input streams. This class provides to methods + * start() and stop() which called by the scheduler to start and stop receiving data/ + * Input streams that can generated RDDs from new data just by running a service on + * the driver node (that is, without running a receiver onworker nodes) can be + * implemented by directly subclassing this InputDStream. For example, + * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory for + * new files and generates RDDs on the new files. For implementing input streams + * that requires running a receiver on the worker nodes, use NetworkInputDStream + * as the parent class. + */ abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext) extends DStream[T](ssc_) { + var lastValidTime: Time = null + + /** + * Checks whether the 'time' is valid wrt slideDuration for generating RDD. + * Additionally it also ensures valid times are in strictly increasing order. + * This ensures that InputDStream.compute() is called strictly on increasing + * times. + */ + override protected def isTimeValid(time: Time): Boolean = { + if (!super.isTimeValid(time)) { + false // Time not valid + } else { + // Time is valid, but check it it is more than lastValidTime + if (lastValidTime == null || lastValidTime <= time) { + logWarning("isTimeValid called with " + time + " where as last valid time is " + lastValidTime) + } + lastValidTime = time + true + } + } + override def dependencies = List() override def slideDuration: Duration = { @@ -13,7 +45,9 @@ abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContex ssc.graph.batchDuration } + /** Method called to start receiving data. Subclasses must implement this method. */ def start() + /** Method called to stop receiving data. Subclasses must implement this method. */ def stop() } diff --git a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala index d733254ddbb9158e669273f050dac5bda9de4ffb..e70822e5c3bd3be5302a113427626b2d8503da35 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala @@ -2,8 +2,8 @@ package spark.streaming.dstream import spark._ import spark.streaming._ -import dstream.{NetworkReceiver, NetworkInputDStream} import storage.StorageLevel + import twitter4j._ import twitter4j.auth.BasicAuthorization @@ -19,7 +19,7 @@ class TwitterInputDStream( password: String, filters: Seq[String], storageLevel: StorageLevel - ) extends NetworkInputDStream[Status](ssc_) { + ) extends NetworkInputDStream[Status](ssc_) { override def createReceiver(): NetworkReceiver[Status] = { new TwitterReceiver(username, password, filters, storageLevel) diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala index 776e676063c1cbec9a8cd2c91dd8eea2f810ca9f..bdd9f4d7535ea0fb055d29ccc5eccf88dd83b958 100644 --- a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala @@ -315,7 +315,7 @@ class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread override def run() { try { // If it is the first killing, then allow the first checkpoint to be created - var minKillWaitTime = if (MasterFailureTest.killCount == 0) 5000 else 1000 + var minKillWaitTime = if (MasterFailureTest.killCount == 0) 5000 else 2000 val killWaitTime = minKillWaitTime + math.abs(Random.nextLong % maxKillWaitTime) logInfo("Kill wait time = " + killWaitTime) Thread.sleep(killWaitTime) diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 7bea0b1fc4e73af8559a115fbe8a5716a2717664..16bacffb9285f30d3f2cfac459bb905bc14ec457 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -23,6 +23,7 @@ import spark.streaming.JavaCheckpointTestUtils; import spark.streaming.dstream.KafkaPartitionKey; import java.io.*; +import java.text.Collator; import java.util.*; // The test suite itself is Serializable so that anonymous Function implementations can be @@ -35,7 +36,7 @@ public class JavaAPISuite implements Serializable { public void setUp() { System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock"); ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); - ssc.checkpoint("checkpoint", new Duration(1000)); + ssc.checkpoint("checkpoint"); } @After @@ -587,26 +588,47 @@ public class JavaAPISuite implements Serializable { @Test public void testGroupByKeyAndWindow() { - List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - List<List<Tuple2<String, List<String>>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")), - new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))), - Arrays.asList(new Tuple2<String, List<String>>("california", - Arrays.asList("sharks", "ducks", "dodgers", "giants")), - new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders", "yankees", "mets"))), - Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")), - new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders")))); + List<List<Tuple2<String, List<Integer>>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3)), + new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 4)) + ), + Arrays.asList( + new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3, 5, 5)), + new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 1, 3, 4)) + ), + Arrays.asList( + new Tuple2<String, List<Integer>>("california", Arrays.asList(5, 5)), + new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 3)) + ) + ); - JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, List<String>> groupWindowed = + JavaPairDStream<String, List<Integer>> groupWindowed = pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(groupWindowed); - List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + List<List<Tuple2<String, List<Integer>>>> result = JavaTestUtils.runStreams(ssc, 3, 3); - Assert.assertEquals(expected, result); + assert(result.size() == expected.size()); + for (int i = 0; i < result.size(); i++) { + assert(convert(result.get(i)).equals(convert(expected.get(i)))); + } + } + + private HashSet<Tuple2<String, HashSet<Integer>>> convert(List<Tuple2<String, List<Integer>>> listOfTuples) { + List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<Tuple2<String, HashSet<Integer>>>(); + for (Tuple2<String, List<Integer>> tuple: listOfTuples) { + newListOfTuples.add(convert(tuple)); + } + return new HashSet<Tuple2<String, HashSet<Integer>>>(newListOfTuples); + } + + private Tuple2<String, HashSet<Integer>> convert(Tuple2<String, List<Integer>> tuple) { + return new Tuple2<String, HashSet<Integer>>(tuple._1(), new HashSet<Integer>(tuple._2())); } @Test @@ -894,7 +916,7 @@ public class JavaAPISuite implements Serializable { Arrays.asList(8,7)); File tempDir = Files.createTempDir(); - ssc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000)); + ssc.checkpoint(tempDir.getAbsolutePath()); JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream letterCount = stream.map(new Function<String, Integer>() { diff --git a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala index 56349837e5e6ba5895c2de50103a24e9d9e94bfe..52ea28732ada89490a28c1d8f2366df491fb501d 100644 --- a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala @@ -57,6 +57,7 @@ trait JavaTestBase extends TestSuiteBase { } object JavaTestUtils extends JavaTestBase { + override def maxWaitTimeMillis = 20000 } diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties index f0638e0e027f21576211e45b451c1dae31446b2f..59c445e63f7974163e77d47257644474dd14e62f 100644 --- a/streaming/src/test/resources/log4j.properties +++ b/streaming/src/test/resources/log4j.properties @@ -1,5 +1,5 @@ # Set everything to be logged to the file streaming/target/unit-tests.log -log4j.rootCategory=WARN, file +log4j.rootCategory=INFO, file # log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false @@ -9,6 +9,4 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: # Ignore messages below warning level from Jetty, because it's a bit verbose log4j.logger.org.eclipse.jetty=WARN -log4j.logger.spark.streaming=INFO -log4j.logger.spark.streaming.dstream.FileInputDStream=DEBUG diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index 1e86cf49bb75a3dedbbeb24cbddb0eeac41c1043..8fce91853c77eed0f5d3e5e48aa2e93f13988e3e 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -229,6 +229,26 @@ class BasicOperationsSuite extends TestSuiteBase { testOperation(inputData, updateStateOperation, outputData, true) } + test("slice") { + val ssc = new StreamingContext("local[2]", "BasicOperationSuite", Seconds(1)) + val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4)) + val stream = new TestInputStream[Int](ssc, input, 2) + ssc.registerInputStream(stream) + stream.foreach(_ => {}) // Dummy output stream + ssc.start() + Thread.sleep(2000) + def getInputFromSlice(fromMillis: Long, toMillis: Long) = { + stream.slice(new Time(fromMillis), new Time(toMillis)).flatMap(_.collect()).toSet + } + + assert(getInputFromSlice(0, 1000) == Set(1)) + assert(getInputFromSlice(0, 2000) == Set(1, 2)) + assert(getInputFromSlice(1000, 2000) == Set(1, 2)) + assert(getInputFromSlice(2000, 4000) == Set(2, 3, 4)) + ssc.stop() + Thread.sleep(1000) + } + test("forgetting of RDDs - map and window operations") { assert(batchDuration === Seconds(1), "Batch duration has changed from 1 second") diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index c89c4a8d43f803731d9fa727e3b596897d9404db..5250667bcbc0d63bbafec2b34b13e556c15aa66c 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -39,14 +39,11 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { override def batchDuration = Milliseconds(500) - override def checkpointInterval = batchDuration - override def actuallyWait = true test("basic rdd checkpoints + dstream graph checkpoint recovery") { assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second") - assert(checkpointInterval === batchDuration, "checkpointInterval for this test much be same as batchDuration") System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") @@ -188,7 +185,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // Set up the streaming context and input streams val testDir = Files.createTempDir() var ssc = new StreamingContext(master, framework, Seconds(1)) - ssc.checkpoint(checkpointDir, checkpointInterval) + ssc.checkpoint(checkpointDir) val fileStream = ssc.textFileStream(testDir.toString) // Making value 3 take large time to process, to ensure that the master // shuts down in the middle of processing the 3rd batch diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala index 2cc31d61376ad40152676655e9f5202a3b2dc0f9..ad6aa79d102339bc0595d30463d47bdf6ee183fd 100644 --- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala @@ -75,9 +75,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { // Directory where the checkpoint data will be saved def checkpointDir = "checkpoint" - // Duration after which the graph is checkpointed - def checkpointInterval = batchDuration - // Number of partitions of the input parallel collections created for testing def numInputPartitions = 2 @@ -99,7 +96,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { // Create StreamingContext val ssc = new StreamingContext(master, framework, batchDuration) if (checkpointDir != null) { - ssc.checkpoint(checkpointDir, checkpointInterval) + ssc.checkpoint(checkpointDir) } // Setup the stream computation @@ -124,7 +121,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { // Create StreamingContext val ssc = new StreamingContext(master, framework, batchDuration) if (checkpointDir != null) { - ssc.checkpoint(checkpointDir, checkpointInterval) + ssc.checkpoint(checkpointDir) } // Setup the stream computation diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala index f8380af3313015164ff6db5571957f382e51ade4..1b66f3bda20ad0f112295e529f4a3f1419c167ed 100644 --- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala @@ -273,6 +273,7 @@ class WindowOperationsSuite extends TestSuiteBase { slideDuration: Duration = Seconds(1) ) { test("reduceByKeyAndWindow - " + name) { + logInfo("reduceByKeyAndWindow - " + name) val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt val operation = (s: DStream[(String, Int)]) => { s.reduceByKeyAndWindow((x: Int, y: Int) => x + y, windowDuration, slideDuration) @@ -288,7 +289,8 @@ class WindowOperationsSuite extends TestSuiteBase { windowDuration: Duration = Seconds(2), slideDuration: Duration = Seconds(1) ) { - test("ReduceByKeyAndWindow with inverse function - " + name) { + test("reduceByKeyAndWindow with inverse function - " + name) { + logInfo("reduceByKeyAndWindow with inverse function - " + name) val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt val operation = (s: DStream[(String, Int)]) => { s.reduceByKeyAndWindow(_ + _, _ - _, windowDuration, slideDuration) @@ -306,6 +308,7 @@ class WindowOperationsSuite extends TestSuiteBase { slideDuration: Duration = Seconds(1) ) { test("reduceByKeyAndWindow with inverse and filter functions - " + name) { + logInfo("reduceByKeyAndWindow with inverse and filter functions - " + name) val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt val filterFunc = (p: (String, Int)) => p._2 != 0 val operation = (s: DStream[(String, Int)]) => {