diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala index c3dabd2e79995604b0ee3372dbeae4b34d988227..3564ab2e2a162f57bb19711f37617245b13506c4 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala @@ -36,33 +36,31 @@ private[spark] class WholeTextFileRecordReader( index: Integer) extends RecordReader[String, String] { - private val path = split.getPath(index) - private val fs = path.getFileSystem(context.getConfiguration) + private[this] val path = split.getPath(index) + private[this] val fs = path.getFileSystem(context.getConfiguration) // True means the current file has been processed, then skip it. - private var processed = false + private[this] var processed = false - private val key = path.toString - private var value: String = null + private[this] val key = path.toString + private[this] var value: String = null - override def initialize(split: InputSplit, context: TaskAttemptContext) = {} + override def initialize(split: InputSplit, context: TaskAttemptContext): Unit = {} - override def close() = {} + override def close(): Unit = {} - override def getProgress = if (processed) 1.0f else 0.0f + override def getProgress: Float = if (processed) 1.0f else 0.0f - override def getCurrentKey = key + override def getCurrentKey: String = key - override def getCurrentValue = value + override def getCurrentValue: String = value - override def nextKeyValue = { + override def nextKeyValue(): Boolean = { if (!processed) { val fileIn = fs.open(path) val innerBuffer = ByteStreams.toByteArray(fileIn) - value = new Text(innerBuffer).toString Closeables.close(fileIn, false) - processed = true true } else { diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 6ef817d0e587e369c56090d65ca388025bc799fc..fd316a89a1a10cd919c1c3caa583d9f6630f45aa 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -63,15 +63,18 @@ import org.apache.spark.metrics.source.Source * * [options] is the specific property of this source or sink. */ -private[spark] class MetricsSystem private (val instance: String, - conf: SparkConf, securityMgr: SecurityManager) extends Logging { +private[spark] class MetricsSystem private ( + val instance: String, + conf: SparkConf, + securityMgr: SecurityManager) + extends Logging { - val confFile = conf.get("spark.metrics.conf", null) - val metricsConfig = new MetricsConfig(Option(confFile)) + private[this] val confFile = conf.get("spark.metrics.conf", null) + private[this] val metricsConfig = new MetricsConfig(Option(confFile)) - val sinks = new mutable.ArrayBuffer[Sink] - val sources = new mutable.ArrayBuffer[Source] - val registry = new MetricRegistry() + private val sinks = new mutable.ArrayBuffer[Sink] + private val sources = new mutable.ArrayBuffer[Source] + private val registry = new MetricRegistry() // Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui private var metricsServlet: Option[MetricsServlet] = None @@ -91,7 +94,7 @@ private[spark] class MetricsSystem private (val instance: String, sinks.foreach(_.stop) } - def report(): Unit = { + def report() { sinks.foreach(_.report()) } @@ -155,8 +158,8 @@ private[spark] object MetricsSystem { val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r - val MINIMAL_POLL_UNIT = TimeUnit.SECONDS - val MINIMAL_POLL_PERIOD = 1 + private[this] val MINIMAL_POLL_UNIT = TimeUnit.SECONDS + private[this] val MINIMAL_POLL_PERIOD = 1 def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) { val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit) @@ -166,7 +169,8 @@ private[spark] object MetricsSystem { } } - def createMetricsSystem(instance: String, conf: SparkConf, - securityMgr: SecurityManager): MetricsSystem = + def createMetricsSystem( + instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = { new MetricsSystem(instance, conf, securityMgr) + } } diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index 96a5a1231813e314560ee187ff6f2977def69010..e42b18119472787dc52ed95ab4154447f9c0c9ba 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -17,42 +17,47 @@ package org.apache.spark.metrics -import org.scalatest.{BeforeAndAfter, FunSuite} +import org.apache.spark.metrics.source.Source +import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester} + import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.master.MasterSource -class MetricsSystemSuite extends FunSuite with BeforeAndAfter { +import scala.collection.mutable.ArrayBuffer + + +class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester{ var filePath: String = _ var conf: SparkConf = null var securityMgr: SecurityManager = null before { - filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile() + filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile conf = new SparkConf(false).set("spark.metrics.conf", filePath) securityMgr = new SecurityManager(conf) } test("MetricsSystem with default config") { val metricsSystem = MetricsSystem.createMetricsSystem("default", conf, securityMgr) - val sources = metricsSystem.sources - val sinks = metricsSystem.sinks + val sources = PrivateMethod[ArrayBuffer[Source]]('sources) + val sinks = PrivateMethod[ArrayBuffer[Source]]('sinks) - assert(sources.length === 0) - assert(sinks.length === 0) - assert(!metricsSystem.getServletHandlers.isEmpty) + assert(metricsSystem.invokePrivate(sources()).length === 0) + assert(metricsSystem.invokePrivate(sinks()).length === 0) + assert(metricsSystem.getServletHandlers.nonEmpty) } test("MetricsSystem with sources add") { val metricsSystem = MetricsSystem.createMetricsSystem("test", conf, securityMgr) - val sources = metricsSystem.sources - val sinks = metricsSystem.sinks + val sources = PrivateMethod[ArrayBuffer[Source]]('sources) + val sinks = PrivateMethod[ArrayBuffer[Source]]('sinks) - assert(sources.length === 0) - assert(sinks.length === 1) - assert(!metricsSystem.getServletHandlers.isEmpty) + assert(metricsSystem.invokePrivate(sources()).length === 0) + assert(metricsSystem.invokePrivate(sinks()).length === 1) + assert(metricsSystem.getServletHandlers.nonEmpty) val source = new MasterSource(null) metricsSystem.registerSource(source) - assert(sources.length === 1) + assert(metricsSystem.invokePrivate(sources()).length === 1) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index ebf83748ffa28a0eb512be36623346d2d7457509..655cec1573f58437fb9318cbc884fdd2902e308d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -19,18 +19,18 @@ package org.apache.spark.streaming import java.util.concurrent.atomic.AtomicInteger -import scala.language.postfixOps +import org.scalatest.{Assertions, BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Timeouts +import org.scalatest.concurrent.Eventually._ +import org.scalatest.exceptions.TestFailedDueToTimeoutException +import org.scalatest.time.SpanSugar._ import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.util.Utils -import org.scalatest.{Assertions, BeforeAndAfter, FunSuite} -import org.scalatest.concurrent.Timeouts -import org.scalatest.concurrent.Eventually._ -import org.scalatest.exceptions.TestFailedDueToTimeoutException -import org.scalatest.time.SpanSugar._ + class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts with Logging { @@ -68,7 +68,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w test("from no conf + spark home + env") { ssc = new StreamingContext(master, appName, batchDuration, sparkHome, Nil, Map(envPair)) - assert(ssc.conf.getExecutorEnv.exists(_ == envPair)) + assert(ssc.conf.getExecutorEnv.contains(envPair)) } test("from conf with settings") { @@ -94,7 +94,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) myConf.set("spark.cleaner.ttl", "10") val ssc1 = new StreamingContext(myConf, batchDuration) - addInputStream(ssc1).register + addInputStream(ssc1).register() ssc1.start() val cp = new Checkpoint(ssc1, Time(1000)) assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === "10") @@ -107,7 +107,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w test("start and stop state check") { ssc = new StreamingContext(master, appName, batchDuration) - addInputStream(ssc).register + addInputStream(ssc).register() assert(ssc.state === ssc.StreamingContextState.Initialized) ssc.start() @@ -118,7 +118,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w test("start multiple times") { ssc = new StreamingContext(master, appName, batchDuration) - addInputStream(ssc).register + addInputStream(ssc).register() ssc.start() intercept[SparkException] { ssc.start() @@ -127,7 +127,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w test("stop multiple times") { ssc = new StreamingContext(master, appName, batchDuration) - addInputStream(ssc).register + addInputStream(ssc).register() ssc.start() ssc.stop() ssc.stop() @@ -135,7 +135,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w test("stop before start and start after stop") { ssc = new StreamingContext(master, appName, batchDuration) - addInputStream(ssc).register + addInputStream(ssc).register() ssc.stop() // stop before start should not throw exception ssc.start() ssc.stop() @@ -147,12 +147,12 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w test("stop only streaming context") { ssc = new StreamingContext(master, appName, batchDuration) sc = ssc.sparkContext - addInputStream(ssc).register + addInputStream(ssc).register() ssc.start() - ssc.stop(false) + ssc.stop(stopSparkContext = false) assert(sc.makeRDD(1 to 100).collect().size === 100) ssc = new StreamingContext(sc, batchDuration) - addInputStream(ssc).register + addInputStream(ssc).register() ssc.start() ssc.stop() } @@ -167,11 +167,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w var runningCount = 0 TestReceiver.counter.set(1) val input = ssc.receiverStream(new TestReceiver) - input.count.foreachRDD(rdd => { + input.count().foreachRDD { rdd => val count = rdd.first() runningCount += count.toInt logInfo("Count = " + count + ", Running count = " + runningCount) - }) + } ssc.start() ssc.awaitTermination(500) ssc.stop(stopSparkContext = false, stopGracefully = true) @@ -191,7 +191,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w test("awaitTermination") { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) - inputStream.map(x => x).register + inputStream.map(x => x).register() // test whether start() blocks indefinitely or not failAfter(2000 millis) { @@ -215,7 +215,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w // test whether wait exits if context is stopped failAfter(10000 millis) { // 10 seconds because spark takes a long time to shutdown new Thread() { - override def run { + override def run() { Thread.sleep(500) ssc.stop() } @@ -239,8 +239,9 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w test("awaitTermination with error in task") { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) - inputStream.map(x => { throw new TestException("error in map task"); x}) - .foreachRDD(_.count) + inputStream + .map { x => throw new TestException("error in map task"); x } + .foreachRDD(_.count()) val exception = intercept[Exception] { ssc.start() @@ -252,7 +253,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w test("awaitTermination with error in job generation") { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) - inputStream.transform(rdd => { throw new TestException("error in transform"); rdd }).register + inputStream.transform { rdd => throw new TestException("error in transform"); rdd }.register() val exception = intercept[TestException] { ssc.start() ssc.awaitTermination(5000) @@ -265,7 +266,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w } def addInputStream(s: StreamingContext): DStream[Int] = { - val input = (1 to 100).map(i => (1 to i)) + val input = (1 to 100).map(i => 1 to i) val inputStream = new TestInputStream(s, input, 1) inputStream }