Skip to content
Snippets Groups Projects
Commit 2eacf224 authored by Tathagata Das's avatar Tathagata Das
Browse files

Removed countByKeyAndWindow on paired DStreams, and added...

Removed countByKeyAndWindow on paired DStreams, and added countByValueAndWindow for all DStreams. Updated both scala and java API and testsuites.
parent 03e8dc68
No related branches found
No related tags found
No related merge requests found
Showing with 231 additions and 189 deletions
......@@ -28,16 +28,15 @@ object PageViewStream {
// Create a NetworkInputDStream on target host:port and convert each line to a PageView
val pageViews = ssc.networkTextStream(host, port)
.flatMap(_.split("\n"))
.map(PageView.fromString(_))
.flatMap(_.split("\n"))
.map(PageView.fromString(_))
// Return a count of views per URL seen in each batch
val pageCounts = pageViews.map(view => ((view.url, 1))).countByKey()
val pageCounts = pageViews.map(view => view.url).countByValue()
// Return a sliding window of page views per URL in the last ten seconds
val slidingPageCounts = pageViews.map(view => ((view.url, 1)))
.window(Seconds(10), Seconds(2))
.countByKey()
val slidingPageCounts = pageViews.map(view => view.url)
.countByValueAndWindow(Seconds(10), Seconds(2))
// Return the rate of error pages (a non 200 status) in each zip code over the last 30 seconds
......
......@@ -441,6 +441,15 @@ abstract class DStream[T: ClassManifest] (
*/
def count(): DStream[Long] = this.map(_ => 1L).reduce(_ + _)
/**
* Return a new DStream in which each RDD contains the counts of each distinct value in
* each RDD of this DStream. Hash partitioning is used to generate
* the RDDs with `numPartitions` partitions (Spark's default number of partitions if
* `numPartitions` not specified).
*/
def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)] =
this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
......@@ -494,14 +503,16 @@ abstract class DStream[T: ClassManifest] (
}
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
* The new DStream generates RDDs with the same interval as this DStream.
* Return a new DStream in which each RDD contains all the elements in seen in a
* sliding window of time over this DStream. The new DStream generates RDDs with
* the same interval as this DStream.
* @param windowDuration width of the window; must be a multiple of this DStream's interval.
*/
def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
* Return a new DStream in which each RDD contains all the elements in seen in a
* sliding window of time over this DStream.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
......@@ -512,19 +523,15 @@ abstract class DStream[T: ClassManifest] (
new WindowedDStream(this, windowDuration, slideDuration)
}
/**
* Return a new DStream which computed based on tumbling window on this DStream.
* This is equivalent to window(batchTime, batchTime).
* @param batchDuration tumbling window duration; must be a multiple of this DStream's
* batching interval
*/
def tumble(batchDuration: Duration): DStream[T] = window(batchDuration, batchDuration)
/**
* Return a new DStream in which each RDD has a single element generated by reducing all
* elements in a window over this DStream. windowDuration and slideDuration are as defined
* in the window() operation. This is equivalent to
* window(windowDuration, slideDuration).reduce(reduceFunc)
* elements in a sliding window over this DStream.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
def reduceByWindow(
reduceFunc: (T, T) => T,
......@@ -534,6 +541,22 @@ abstract class DStream[T: ClassManifest] (
this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
}
/**
* Return a new DStream in which each RDD has a single element generated by reducing all
* elements in a sliding window over this DStream. However, the reduction is done incrementally
* using the old window's reduced value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient than reduceByWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions".
* @param reduceFunc associative reduce function
* @param invReduceFunc inverse reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
def reduceByWindow(
reduceFunc: (T, T) => T,
invReduceFunc: (T, T) => T,
......@@ -547,13 +570,46 @@ abstract class DStream[T: ClassManifest] (
/**
* Return a new DStream in which each RDD has a single element generated by counting the number
* of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
* window() operation. This is equivalent to window(windowDuration, slideDuration).count()
* of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with
* Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = {
this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
}
/**
* Return a new DStream in which each RDD contains the count of distinct elements in
* RDDs in a sliding window over this DStream. Hash partitioning is used to generate
* the RDDs with `numPartitions` partitions (Spark's default number of partitions if
* `numPartitions` not specified).
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @param numPartitions number of partitions of each RDD in the new DStream.
*/
def countByValueAndWindow(
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int = ssc.sc.defaultParallelism
): DStream[(T, Long)] = {
this.map(x => (x, 1L)).reduceByKeyAndWindow(
(x: Long, y: Long) => x + y,
(x: Long, y: Long) => x - y,
windowDuration,
slideDuration,
numPartitions,
(x: (T, Long)) => x._2 != 0L
)
}
/**
* Return a new DStream by unifying data of another DStream with this DStream.
* @param that Another DStream having the same slideDuration as this DStream.
......
......@@ -94,14 +94,6 @@ extends Serializable {
new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner)
}
/**
* Return a new DStream by counting the number of values of each key in each RDD. Hash
* partitioning is used to generate the RDDs with Spark's `numPartitions` partitions.
*/
def countByKey(numPartitions: Int = self.ssc.sc.defaultParallelism): DStream[(K, Long)] = {
self.map(x => (x._1, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
}
/**
* Return a new DStream by applying `groupByKey` over a sliding window. This is similar to
* `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs
......@@ -211,7 +203,7 @@ extends Serializable {
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @param numPartitions Number of partitions of each RDD in the new DStream.
* @param numPartitions number of partitions of each RDD in the new DStream.
*/
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
......@@ -248,10 +240,10 @@ extends Serializable {
/**
* Return a new DStream by applying incremental `reduceByKey` over a sliding window.
* The reduced value of over a new window is calculated using the old window's reduce value :
* The reduced value of over a new window is calculated using the old window's reduced value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
* This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions".
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
......@@ -281,10 +273,10 @@ extends Serializable {
/**
* Return a new DStream by applying incremental `reduceByKey` over a sliding window.
* The reduced value of over a new window is calculated using the old window's reduce value :
* The reduced value of over a new window is calculated using the old window's reduced value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
* This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions".
* @param reduceFunc associative reduce function
* @param invReduceFunc inverse reduce function
......@@ -315,31 +307,6 @@ extends Serializable {
)
}
/**
* Return a new DStream by counting the number of values for each key over a window.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @param numPartitions Number of partitions of each RDD in the new DStream.
*/
def countByKeyAndWindow(
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int = self.ssc.sc.defaultParallelism
): DStream[(K, Long)] = {
self.map(x => (x._1, 1L)).reduceByKeyAndWindow(
(x: Long, y: Long) => x + y,
(x: Long, y: Long) => x - y,
windowDuration,
slideDuration,
numPartitions
)
}
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
......
......@@ -36,7 +36,7 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM
def cache(): JavaDStream[T] = dstream.cache()
/** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def persist(): JavaDStream[T] = dstream.cache()
def persist(): JavaDStream[T] = dstream.persist()
/** Persist the RDDs of this DStream with the given storage level */
def persist(storageLevel: StorageLevel): JavaDStream[T] = dstream.persist(storageLevel)
......@@ -50,33 +50,26 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM
}
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
* The new DStream generates RDDs with the same interval as this DStream.
* Return a new DStream in which each RDD contains all the elements in seen in a
* sliding window of time over this DStream. The new DStream generates RDDs with
* the same interval as this DStream.
* @param windowDuration width of the window; must be a multiple of this DStream's interval.
* @return
*/
def window(windowDuration: Duration): JavaDStream[T] =
dstream.window(windowDuration)
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
* @param windowDuration duration (i.e., width) of the window;
* must be a multiple of this DStream's interval
* Return a new DStream in which each RDD contains all the elements in seen in a
* sliding window of time over this DStream.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's interval
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
def window(windowDuration: Duration, slideDuration: Duration): JavaDStream[T] =
dstream.window(windowDuration, slideDuration)
/**
* Return a new DStream which computed based on tumbling window on this DStream.
* This is equivalent to window(batchDuration, batchDuration).
* @param batchDuration tumbling window duration; must be a multiple of this DStream's interval
*/
def tumble(batchDuration: Duration): JavaDStream[T] =
dstream.tumble(batchDuration)
/**
* Return a new DStream by unifying data of another DStream with this DStream.
* @param that Another DStream having the same interval (i.e., slideDuration) as this DStream.
......
......@@ -33,6 +33,26 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
*/
def count(): JavaDStream[JLong] = dstream.count()
/**
* Return a new DStream in which each RDD contains the counts of each distinct value in
* each RDD of this DStream. Hash partitioning is used to generate the RDDs with
* Spark's default number of partitions.
*/
def countByValue(): JavaPairDStream[T, JLong] = {
JavaPairDStream.scalaToJavaLong(dstream.countByValue())
}
/**
* Return a new DStream in which each RDD contains the counts of each distinct value in
* each RDD of this DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
* partitions.
* @param numPartitions number of partitions of each RDD in the new DStream.
*/
def countByValue(numPartitions: Int): JavaPairDStream[T, JLong] = {
JavaPairDStream.scalaToJavaLong(dstream.countByValue(numPartitions))
}
/**
* Return a new DStream in which each RDD has a single element generated by counting the number
* of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
......@@ -42,6 +62,39 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
dstream.countByWindow(windowDuration, slideDuration)
}
/**
* Return a new DStream in which each RDD contains the count of distinct elements in
* RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with
* Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration)
: JavaPairDStream[T, JLong] = {
JavaPairDStream.scalaToJavaLong(
dstream.countByValueAndWindow(windowDuration, slideDuration))
}
/**
* Return a new DStream in which each RDD contains the count of distinct elements in
* RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
* partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @param numPartitions number of partitions of each RDD in the new DStream.
*/
def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
: JavaPairDStream[T, JLong] = {
JavaPairDStream.scalaToJavaLong(
dstream.countByValueAndWindow(windowDuration, slideDuration, numPartitions))
}
/**
* Return a new DStream in which each RDD is generated by applying glom() to each RDD of
* this DStream. Applying glom() to an RDD coalesces all elements within each partition into
......@@ -114,8 +167,38 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
/**
* Return a new DStream in which each RDD has a single element generated by reducing all
* elements in a window over this DStream. windowDuration and slideDuration are as defined in the
* window() operation. This is equivalent to window(windowDuration, slideDuration).reduce(reduceFunc)
* elements in a sliding window over this DStream.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
def reduceByWindow(
reduceFunc: (T, T) => T,
windowDuration: Duration,
slideDuration: Duration
): DStream[T] = {
dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration)
}
/**
* Return a new DStream in which each RDD has a single element generated by reducing all
* elements in a sliding window over this DStream. However, the reduction is done incrementally
* using the old window's reduced value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient than reduceByWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions".
* @param reduceFunc associative reduce function
* @param invReduceFunc inverse reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
def reduceByWindow(
reduceFunc: JFunction2[T, T, T],
......
......@@ -33,7 +33,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
def cache(): JavaPairDStream[K, V] = dstream.cache()
/** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def persist(): JavaPairDStream[K, V] = dstream.cache()
def persist(): JavaPairDStream[K, V] = dstream.persist()
/** Persist the RDDs of this DStream with the given storage level */
def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel)
......@@ -66,14 +66,6 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
def window(windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, V] =
dstream.window(windowDuration, slideDuration)
/**
* Return a new DStream which computed based on tumbling window on this DStream.
* This is equivalent to window(batchDuration, batchDuration).
* @param batchDuration tumbling window duration; must be a multiple of this DStream's interval
*/
def tumble(batchDuration: Duration): JavaPairDStream[K, V] =
dstream.tumble(batchDuration)
/**
* Return a new DStream by unifying data of another DStream with this DStream.
* @param that Another DStream having the same interval (i.e., slideDuration) as this DStream.
......@@ -148,23 +140,6 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
}
/**
* Return a new DStream by counting the number of values of each key in each RDD. Hash
* partitioning is used to generate the RDDs with Spark's `numPartitions` partitions.
*/
def countByKey(numPartitions: Int): JavaPairDStream[K, JLong] = {
JavaPairDStream.scalaToJavaLong(dstream.countByKey(numPartitions));
}
/**
* Return a new DStream by counting the number of values of each key in each RDD. Hash
* partitioning is used to generate the RDDs with the default number of partitions.
*/
def countByKey(): JavaPairDStream[K, JLong] = {
JavaPairDStream.scalaToJavaLong(dstream.countByKey());
}
/**
* Return a new DStream by applying `groupByKey` over a sliding window. This is similar to
* `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs
......@@ -402,35 +377,6 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
)
}
/**
* Create a new DStream by counting the number of values for each key over a window.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
: JavaPairDStream[K, JLong] = {
JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowDuration, slideDuration))
}
/**
* Create a new DStream by counting the number of values for each key over a window.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @param numPartitions Number of partitions of each RDD in the new DStream.
*/
def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
: JavaPairDStream[K, Long] = {
dstream.countByKeyAndWindow(windowDuration, slideDuration, numPartitions)
}
private def convertUpdateStateFunction[S](in: JFunction2[JList[V], Optional[S], Optional[S]]):
(Seq[V], Option[S]) => Option[S] = {
val scalaFunc: (Seq[V], Option[S]) => Option[S] = (values, state) => {
......
......@@ -134,29 +134,6 @@ public class JavaAPISuite implements Serializable {
assertOrderInvariantEquals(expected, result);
}
@Test
public void testTumble() {
List<List<Integer>> inputData = Arrays.asList(
Arrays.asList(1,2,3),
Arrays.asList(4,5,6),
Arrays.asList(7,8,9),
Arrays.asList(10,11,12),
Arrays.asList(13,14,15),
Arrays.asList(16,17,18));
List<List<Integer>> expected = Arrays.asList(
Arrays.asList(1,2,3,4,5,6),
Arrays.asList(7,8,9,10,11,12),
Arrays.asList(13,14,15,16,17,18));
JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream windowed = stream.tumble(new Duration(2000));
JavaTestUtils.attachTestOutputStream(windowed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 6, 3);
assertOrderInvariantEquals(expected, result);
}
@Test
public void testFilter() {
List<List<String>> inputData = Arrays.asList(
......@@ -584,24 +561,26 @@ public class JavaAPISuite implements Serializable {
}
@Test
public void testCountByKey() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
public void testCountByValue() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("hello", "world"),
Arrays.asList("hello", "moon"),
Arrays.asList("hello"));
List<List<Tuple2<String, Long>>> expected = Arrays.asList(
Arrays.asList(
new Tuple2<String, Long>("california", 2L),
new Tuple2<String, Long>("new york", 2L)),
Arrays.asList(
new Tuple2<String, Long>("california", 2L),
new Tuple2<String, Long>("new york", 2L)));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
Arrays.asList(
new Tuple2<String, Long>("hello", 1L),
new Tuple2<String, Long>("world", 1L)),
Arrays.asList(
new Tuple2<String, Long>("hello", 1L),
new Tuple2<String, Long>("moon", 1L)),
Arrays.asList(
new Tuple2<String, Long>("hello", 1L)));
JavaPairDStream<String, Long> counted = pairStream.countByKey();
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Long> counted = stream.countByValue();
JavaTestUtils.attachTestOutputStream(counted);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Assert.assertEquals(expected, result);
}
......@@ -712,26 +691,28 @@ public class JavaAPISuite implements Serializable {
}
@Test
public void testCountByKeyAndWindow() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
public void testCountByValueAndWindow() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("hello", "world"),
Arrays.asList("hello", "moon"),
Arrays.asList("hello"));
List<List<Tuple2<String, Long>>> expected = Arrays.asList(
Arrays.asList(
new Tuple2<String, Long>("california", 2L),
new Tuple2<String, Long>("new york", 2L)),
new Tuple2<String, Long>("hello", 1L),
new Tuple2<String, Long>("world", 1L)),
Arrays.asList(
new Tuple2<String, Long>("california", 4L),
new Tuple2<String, Long>("new york", 4L)),
new Tuple2<String, Long>("hello", 2L),
new Tuple2<String, Long>("world", 1L),
new Tuple2<String, Long>("moon", 1L)),
Arrays.asList(
new Tuple2<String, Long>("california", 2L),
new Tuple2<String, Long>("new york", 2L)));
new Tuple2<String, Long>("hello", 2L),
new Tuple2<String, Long>("moon", 1L)));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Long> counted =
pairStream.countByKeyAndWindow(new Duration(2000), new Duration(1000));
stream.countByValueAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(counted);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
......
......@@ -24,7 +24,7 @@ class BasicOperationsSuite extends TestSuiteBase {
)
}
test("flatmap") {
test("flatMap") {
val input = Seq(1 to 4, 5 to 8, 9 to 12)
testOperation(
input,
......@@ -88,6 +88,23 @@ class BasicOperationsSuite extends TestSuiteBase {
)
}
test("count") {
testOperation(
Seq(1 to 1, 1 to 2, 1 to 3, 1 to 4),
(s: DStream[Int]) => s.count(),
Seq(Seq(1L), Seq(2L), Seq(3L), Seq(4L))
)
}
test("countByValue") {
testOperation(
Seq(1 to 1, Seq(1, 1, 1), 1 to 2, Seq(1, 1, 2, 2)),
(s: DStream[Int]) => s.countByValue(),
Seq(Seq((1, 1L)), Seq((1, 3L)), Seq((1, 1L), (2, 1L)), Seq((2, 2L), (1, 2L))),
true
)
}
test("mapValues") {
testOperation(
Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ),
......@@ -206,7 +223,7 @@ class BasicOperationsSuite extends TestSuiteBase {
case _ => Option(stateObj)
}
}
s.map(_ -> 1).updateStateByKey[StateObject](updateFunc).mapValues(_.counter)
s.map(x => (x, 1)).updateStateByKey[StateObject](updateFunc).mapValues(_.counter)
}
testOperation(inputData, updateStateOperation, outputData, true)
......
......@@ -236,14 +236,14 @@ class WindowOperationsSuite extends TestSuiteBase {
testOperation(input, operation, expectedOutput, numBatches, true)
}
test("countByKeyAndWindow") {
val input = Seq(Seq(("a", 1)), Seq(("b", 1), ("b", 2)), Seq(("a", 10), ("b", 20)))
test("countByValueAndWindow") {
val input = Seq(Seq("a"), Seq("b", "b"), Seq("a", "b"))
val expectedOutput = Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 2)), Seq(("a", 1), ("b", 3)))
val windowDuration = Seconds(2)
val slideDuration = Seconds(1)
val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.countByKeyAndWindow(windowDuration, slideDuration).map(x => (x._1, x._2.toInt))
val operation = (s: DStream[String]) => {
s.countByValueAndWindow(windowDuration, slideDuration).map(x => (x._1, x._2.toInt))
}
testOperation(input, operation, expectedOutput, numBatches, true)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment