Skip to content
Snippets Groups Projects
Commit c3002c4a authored by Yadong Qi's avatar Yadong Qi Committed by Tathagata Das
Browse files

[SPARK-4294][Streaming] UnionDStream stream should express the requirements in...

[SPARK-4294][Streaming] UnionDStream stream should express the requirements in the same way as TransformedDStream

In class TransformedDStream:
```scala
require(parents.length > 0, "List of DStreams to transform is empty")
require(parents.map(.ssc).distinct.size == 1, "Some of the DStreams have different contexts")
require(parents.map(.slideDuration).distinct.size == 1,
"Some of the DStreams have different slide durations")
```

In class UnionDStream:
```scala
if (parents.length == 0)
{ throw new IllegalArgumentException("Empty array of parents") }
if (parents.map(.ssc).distinct.size > 1)
{ throw new IllegalArgumentException("Array of parents have different StreamingContexts") }
if (parents.map(.slideDuration).distinct.size > 1)
{ throw new IllegalArgumentException("Array of parents have different slide times") }
```

The function is the same, but the realization is not. I think they shoule be the same.

Author: Yadong Qi <qiyadong2010@gmail.com>

Closes #3152 from watermen/bug-fix1 and squashes the following commits:

ed66db6 [Yadong Qi] Change transform to union
b6b3b8b [Yadong Qi] The same function should have the same realization.
parent 73c8ea84
No related branches found
No related tags found
No related merge requests found
......@@ -28,17 +28,10 @@ private[streaming]
class UnionDStream[T: ClassTag](parents: Array[DStream[T]])
extends DStream[T](parents.head.ssc) {
if (parents.length == 0) {
throw new IllegalArgumentException("Empty array of parents")
}
if (parents.map(_.ssc).distinct.size > 1) {
throw new IllegalArgumentException("Array of parents have different StreamingContexts")
}
if (parents.map(_.slideDuration).distinct.size > 1) {
throw new IllegalArgumentException("Array of parents have different slide times")
}
require(parents.length > 0, "List of DStreams to union is empty")
require(parents.map(_.ssc).distinct.size == 1, "Some of the DStreams have different contexts")
require(parents.map(_.slideDuration).distinct.size == 1,
"Some of the DStreams have different slide durations")
override def dependencies = parents.toList
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment