Skip to content
Snippets Groups Projects
Commit b928f543 authored by Paavo's avatar Paavo Committed by Sean Owen
Browse files

[SPARK-8200] [MLLIB] Check for empty RDDs in StreamingLinearAlgorithm

Test cases for both StreamingLinearRegression and StreamingLogisticRegression, and code fix.

Edit:
This contribution is my original work and I license the work to the project under the project's open source license.

Author: Paavo <pparkkin@gmail.com>

Closes #6713 from pparkkin/streamingmodel-empty-rdd and squashes the following commits:

ff5cd78 [Paavo] Update strings to use interpolation.
db234cf [Paavo] Use !rdd.isEmpty.
54ad89e [Paavo] Test case for empty stream.
393e36f [Paavo] Ignore empty RDDs.
0bfc365 [Paavo] Test case for empty stream.
parent 96a7c888
No related branches found
No related tags found
No related merge requests found
...@@ -83,13 +83,15 @@ abstract class StreamingLinearAlgorithm[ ...@@ -83,13 +83,15 @@ abstract class StreamingLinearAlgorithm[
throw new IllegalArgumentException("Model must be initialized before starting training.") throw new IllegalArgumentException("Model must be initialized before starting training.")
} }
data.foreachRDD { (rdd, time) => data.foreachRDD { (rdd, time) =>
model = Some(algorithm.run(rdd, model.get.weights)) if (!rdd.isEmpty) {
logInfo("Model updated at time %s".format(time.toString)) model = Some(algorithm.run(rdd, model.get.weights))
val display = model.get.weights.size match { logInfo(s"Model updated at time ${time.toString}")
case x if x > 100 => model.get.weights.toArray.take(100).mkString("[", ",", "...") val display = model.get.weights.size match {
case _ => model.get.weights.toArray.mkString("[", ",", "]") case x if x > 100 => model.get.weights.toArray.take(100).mkString("[", ",", "...")
case _ => model.get.weights.toArray.mkString("[", ",", "]")
}
logInfo(s"Current model: weights, ${display}")
} }
logInfo("Current model: weights, %s".format (display))
} }
} }
......
...@@ -158,4 +158,21 @@ class StreamingLogisticRegressionSuite extends SparkFunSuite with TestSuiteBase ...@@ -158,4 +158,21 @@ class StreamingLogisticRegressionSuite extends SparkFunSuite with TestSuiteBase
val error = output.map(batch => batch.map(p => math.abs(p._1 - p._2)).sum / nPoints).toList val error = output.map(batch => batch.map(p => math.abs(p._1 - p._2)).sum / nPoints).toList
assert(error.head > 0.8 & error.last < 0.2) assert(error.head > 0.8 & error.last < 0.2)
} }
// Test empty RDDs in a stream
test("handling empty RDDs in a stream") {
val model = new StreamingLogisticRegressionWithSGD()
.setInitialWeights(Vectors.dense(-0.1))
.setStepSize(0.01)
.setNumIterations(10)
val numBatches = 10
val emptyInput = Seq.empty[Seq[LabeledPoint]]
val ssc = setupStreams(emptyInput,
(inputDStream: DStream[LabeledPoint]) => {
model.trainOn(inputDStream)
model.predictOnValues(inputDStream.map(x => (x.label, x.features)))
}
)
val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches)
}
} }
...@@ -166,4 +166,22 @@ class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase { ...@@ -166,4 +166,22 @@ class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase {
val error = output.map(batch => batch.map(p => math.abs(p._1 - p._2)).sum / nPoints).toList val error = output.map(batch => batch.map(p => math.abs(p._1 - p._2)).sum / nPoints).toList
assert((error.head - error.last) > 2) assert((error.head - error.last) > 2)
} }
// Test empty RDDs in a stream
test("handling empty RDDs in a stream") {
val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.dense(0.0, 0.0))
.setStepSize(0.2)
.setNumIterations(25)
val numBatches = 10
val nPoints = 100
val emptyInput = Seq.empty[Seq[LabeledPoint]]
val ssc = setupStreams(emptyInput,
(inputDStream: DStream[LabeledPoint]) => {
model.trainOn(inputDStream)
model.predictOnValues(inputDStream.map(x => (x.label, x.features)))
}
)
val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches)
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment