Skip to content
Snippets Groups Projects
Commit 436a7730 authored by Reynold Xin's avatar Reynold Xin
Browse files

Minor cleanup to tighten visibility and remove compilation warning.

Author: Reynold Xin <rxin@apache.org>

Closes #2555 from rxin/cleanup and squashes the following commits:

6add199 [Reynold Xin] Minor cleanup to tighten visibility and remove compilation warning.
parent 2d972fd8
No related branches found
No related tags found
No related merge requests found
...@@ -36,33 +36,31 @@ private[spark] class WholeTextFileRecordReader( ...@@ -36,33 +36,31 @@ private[spark] class WholeTextFileRecordReader(
index: Integer) index: Integer)
extends RecordReader[String, String] { extends RecordReader[String, String] {
private val path = split.getPath(index) private[this] val path = split.getPath(index)
private val fs = path.getFileSystem(context.getConfiguration) private[this] val fs = path.getFileSystem(context.getConfiguration)
// True means the current file has been processed, then skip it. // True means the current file has been processed, then skip it.
private var processed = false private[this] var processed = false
private val key = path.toString private[this] val key = path.toString
private var value: String = null private[this] var value: String = null
override def initialize(split: InputSplit, context: TaskAttemptContext) = {} override def initialize(split: InputSplit, context: TaskAttemptContext): Unit = {}
override def close() = {} override def close(): Unit = {}
override def getProgress = if (processed) 1.0f else 0.0f override def getProgress: Float = if (processed) 1.0f else 0.0f
override def getCurrentKey = key override def getCurrentKey: String = key
override def getCurrentValue = value override def getCurrentValue: String = value
override def nextKeyValue = { override def nextKeyValue(): Boolean = {
if (!processed) { if (!processed) {
val fileIn = fs.open(path) val fileIn = fs.open(path)
val innerBuffer = ByteStreams.toByteArray(fileIn) val innerBuffer = ByteStreams.toByteArray(fileIn)
value = new Text(innerBuffer).toString value = new Text(innerBuffer).toString
Closeables.close(fileIn, false) Closeables.close(fileIn, false)
processed = true processed = true
true true
} else { } else {
......
...@@ -63,15 +63,18 @@ import org.apache.spark.metrics.source.Source ...@@ -63,15 +63,18 @@ import org.apache.spark.metrics.source.Source
* *
* [options] is the specific property of this source or sink. * [options] is the specific property of this source or sink.
*/ */
private[spark] class MetricsSystem private (val instance: String, private[spark] class MetricsSystem private (
conf: SparkConf, securityMgr: SecurityManager) extends Logging { val instance: String,
conf: SparkConf,
securityMgr: SecurityManager)
extends Logging {
val confFile = conf.get("spark.metrics.conf", null) private[this] val confFile = conf.get("spark.metrics.conf", null)
val metricsConfig = new MetricsConfig(Option(confFile)) private[this] val metricsConfig = new MetricsConfig(Option(confFile))
val sinks = new mutable.ArrayBuffer[Sink] private val sinks = new mutable.ArrayBuffer[Sink]
val sources = new mutable.ArrayBuffer[Source] private val sources = new mutable.ArrayBuffer[Source]
val registry = new MetricRegistry() private val registry = new MetricRegistry()
// Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui // Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui
private var metricsServlet: Option[MetricsServlet] = None private var metricsServlet: Option[MetricsServlet] = None
...@@ -91,7 +94,7 @@ private[spark] class MetricsSystem private (val instance: String, ...@@ -91,7 +94,7 @@ private[spark] class MetricsSystem private (val instance: String,
sinks.foreach(_.stop) sinks.foreach(_.stop)
} }
def report(): Unit = { def report() {
sinks.foreach(_.report()) sinks.foreach(_.report())
} }
...@@ -155,8 +158,8 @@ private[spark] object MetricsSystem { ...@@ -155,8 +158,8 @@ private[spark] object MetricsSystem {
val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r
val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r
val MINIMAL_POLL_UNIT = TimeUnit.SECONDS private[this] val MINIMAL_POLL_UNIT = TimeUnit.SECONDS
val MINIMAL_POLL_PERIOD = 1 private[this] val MINIMAL_POLL_PERIOD = 1
def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) { def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) {
val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit) val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit)
...@@ -166,7 +169,8 @@ private[spark] object MetricsSystem { ...@@ -166,7 +169,8 @@ private[spark] object MetricsSystem {
} }
} }
def createMetricsSystem(instance: String, conf: SparkConf, def createMetricsSystem(
securityMgr: SecurityManager): MetricsSystem = instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = {
new MetricsSystem(instance, conf, securityMgr) new MetricsSystem(instance, conf, securityMgr)
}
} }
...@@ -17,42 +17,47 @@ ...@@ -17,42 +17,47 @@
package org.apache.spark.metrics package org.apache.spark.metrics
import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.spark.metrics.source.Source
import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.master.MasterSource import org.apache.spark.deploy.master.MasterSource
class MetricsSystemSuite extends FunSuite with BeforeAndAfter { import scala.collection.mutable.ArrayBuffer
class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester{
var filePath: String = _ var filePath: String = _
var conf: SparkConf = null var conf: SparkConf = null
var securityMgr: SecurityManager = null var securityMgr: SecurityManager = null
before { before {
filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile() filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile
conf = new SparkConf(false).set("spark.metrics.conf", filePath) conf = new SparkConf(false).set("spark.metrics.conf", filePath)
securityMgr = new SecurityManager(conf) securityMgr = new SecurityManager(conf)
} }
test("MetricsSystem with default config") { test("MetricsSystem with default config") {
val metricsSystem = MetricsSystem.createMetricsSystem("default", conf, securityMgr) val metricsSystem = MetricsSystem.createMetricsSystem("default", conf, securityMgr)
val sources = metricsSystem.sources val sources = PrivateMethod[ArrayBuffer[Source]]('sources)
val sinks = metricsSystem.sinks val sinks = PrivateMethod[ArrayBuffer[Source]]('sinks)
assert(sources.length === 0) assert(metricsSystem.invokePrivate(sources()).length === 0)
assert(sinks.length === 0) assert(metricsSystem.invokePrivate(sinks()).length === 0)
assert(!metricsSystem.getServletHandlers.isEmpty) assert(metricsSystem.getServletHandlers.nonEmpty)
} }
test("MetricsSystem with sources add") { test("MetricsSystem with sources add") {
val metricsSystem = MetricsSystem.createMetricsSystem("test", conf, securityMgr) val metricsSystem = MetricsSystem.createMetricsSystem("test", conf, securityMgr)
val sources = metricsSystem.sources val sources = PrivateMethod[ArrayBuffer[Source]]('sources)
val sinks = metricsSystem.sinks val sinks = PrivateMethod[ArrayBuffer[Source]]('sinks)
assert(sources.length === 0) assert(metricsSystem.invokePrivate(sources()).length === 0)
assert(sinks.length === 1) assert(metricsSystem.invokePrivate(sinks()).length === 1)
assert(!metricsSystem.getServletHandlers.isEmpty) assert(metricsSystem.getServletHandlers.nonEmpty)
val source = new MasterSource(null) val source = new MasterSource(null)
metricsSystem.registerSource(source) metricsSystem.registerSource(source)
assert(sources.length === 1) assert(metricsSystem.invokePrivate(sources()).length === 1)
} }
} }
...@@ -19,18 +19,18 @@ package org.apache.spark.streaming ...@@ -19,18 +19,18 @@ package org.apache.spark.streaming
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import scala.language.postfixOps import org.scalatest.{Assertions, BeforeAndAfter, FunSuite}
import org.scalatest.concurrent.Timeouts
import org.scalatest.concurrent.Eventually._
import org.scalatest.exceptions.TestFailedDueToTimeoutException
import org.scalatest.time.SpanSugar._
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException} import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
import org.scalatest.{Assertions, BeforeAndAfter, FunSuite}
import org.scalatest.concurrent.Timeouts
import org.scalatest.concurrent.Eventually._
import org.scalatest.exceptions.TestFailedDueToTimeoutException
import org.scalatest.time.SpanSugar._
class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts with Logging { class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts with Logging {
...@@ -68,7 +68,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ...@@ -68,7 +68,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("from no conf + spark home + env") { test("from no conf + spark home + env") {
ssc = new StreamingContext(master, appName, batchDuration, ssc = new StreamingContext(master, appName, batchDuration,
sparkHome, Nil, Map(envPair)) sparkHome, Nil, Map(envPair))
assert(ssc.conf.getExecutorEnv.exists(_ == envPair)) assert(ssc.conf.getExecutorEnv.contains(envPair))
} }
test("from conf with settings") { test("from conf with settings") {
...@@ -94,7 +94,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ...@@ -94,7 +94,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
myConf.set("spark.cleaner.ttl", "10") myConf.set("spark.cleaner.ttl", "10")
val ssc1 = new StreamingContext(myConf, batchDuration) val ssc1 = new StreamingContext(myConf, batchDuration)
addInputStream(ssc1).register addInputStream(ssc1).register()
ssc1.start() ssc1.start()
val cp = new Checkpoint(ssc1, Time(1000)) val cp = new Checkpoint(ssc1, Time(1000))
assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === "10") assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === "10")
...@@ -107,7 +107,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ...@@ -107,7 +107,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("start and stop state check") { test("start and stop state check") {
ssc = new StreamingContext(master, appName, batchDuration) ssc = new StreamingContext(master, appName, batchDuration)
addInputStream(ssc).register addInputStream(ssc).register()
assert(ssc.state === ssc.StreamingContextState.Initialized) assert(ssc.state === ssc.StreamingContextState.Initialized)
ssc.start() ssc.start()
...@@ -118,7 +118,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ...@@ -118,7 +118,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("start multiple times") { test("start multiple times") {
ssc = new StreamingContext(master, appName, batchDuration) ssc = new StreamingContext(master, appName, batchDuration)
addInputStream(ssc).register addInputStream(ssc).register()
ssc.start() ssc.start()
intercept[SparkException] { intercept[SparkException] {
ssc.start() ssc.start()
...@@ -127,7 +127,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ...@@ -127,7 +127,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("stop multiple times") { test("stop multiple times") {
ssc = new StreamingContext(master, appName, batchDuration) ssc = new StreamingContext(master, appName, batchDuration)
addInputStream(ssc).register addInputStream(ssc).register()
ssc.start() ssc.start()
ssc.stop() ssc.stop()
ssc.stop() ssc.stop()
...@@ -135,7 +135,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ...@@ -135,7 +135,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("stop before start and start after stop") { test("stop before start and start after stop") {
ssc = new StreamingContext(master, appName, batchDuration) ssc = new StreamingContext(master, appName, batchDuration)
addInputStream(ssc).register addInputStream(ssc).register()
ssc.stop() // stop before start should not throw exception ssc.stop() // stop before start should not throw exception
ssc.start() ssc.start()
ssc.stop() ssc.stop()
...@@ -147,12 +147,12 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ...@@ -147,12 +147,12 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("stop only streaming context") { test("stop only streaming context") {
ssc = new StreamingContext(master, appName, batchDuration) ssc = new StreamingContext(master, appName, batchDuration)
sc = ssc.sparkContext sc = ssc.sparkContext
addInputStream(ssc).register addInputStream(ssc).register()
ssc.start() ssc.start()
ssc.stop(false) ssc.stop(stopSparkContext = false)
assert(sc.makeRDD(1 to 100).collect().size === 100) assert(sc.makeRDD(1 to 100).collect().size === 100)
ssc = new StreamingContext(sc, batchDuration) ssc = new StreamingContext(sc, batchDuration)
addInputStream(ssc).register addInputStream(ssc).register()
ssc.start() ssc.start()
ssc.stop() ssc.stop()
} }
...@@ -167,11 +167,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ...@@ -167,11 +167,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
var runningCount = 0 var runningCount = 0
TestReceiver.counter.set(1) TestReceiver.counter.set(1)
val input = ssc.receiverStream(new TestReceiver) val input = ssc.receiverStream(new TestReceiver)
input.count.foreachRDD(rdd => { input.count().foreachRDD { rdd =>
val count = rdd.first() val count = rdd.first()
runningCount += count.toInt runningCount += count.toInt
logInfo("Count = " + count + ", Running count = " + runningCount) logInfo("Count = " + count + ", Running count = " + runningCount)
}) }
ssc.start() ssc.start()
ssc.awaitTermination(500) ssc.awaitTermination(500)
ssc.stop(stopSparkContext = false, stopGracefully = true) ssc.stop(stopSparkContext = false, stopGracefully = true)
...@@ -191,7 +191,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ...@@ -191,7 +191,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("awaitTermination") { test("awaitTermination") {
ssc = new StreamingContext(master, appName, batchDuration) ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc) val inputStream = addInputStream(ssc)
inputStream.map(x => x).register inputStream.map(x => x).register()
// test whether start() blocks indefinitely or not // test whether start() blocks indefinitely or not
failAfter(2000 millis) { failAfter(2000 millis) {
...@@ -215,7 +215,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ...@@ -215,7 +215,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
// test whether wait exits if context is stopped // test whether wait exits if context is stopped
failAfter(10000 millis) { // 10 seconds because spark takes a long time to shutdown failAfter(10000 millis) { // 10 seconds because spark takes a long time to shutdown
new Thread() { new Thread() {
override def run { override def run() {
Thread.sleep(500) Thread.sleep(500)
ssc.stop() ssc.stop()
} }
...@@ -239,8 +239,9 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ...@@ -239,8 +239,9 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("awaitTermination with error in task") { test("awaitTermination with error in task") {
ssc = new StreamingContext(master, appName, batchDuration) ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc) val inputStream = addInputStream(ssc)
inputStream.map(x => { throw new TestException("error in map task"); x}) inputStream
.foreachRDD(_.count) .map { x => throw new TestException("error in map task"); x }
.foreachRDD(_.count())
val exception = intercept[Exception] { val exception = intercept[Exception] {
ssc.start() ssc.start()
...@@ -252,7 +253,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ...@@ -252,7 +253,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("awaitTermination with error in job generation") { test("awaitTermination with error in job generation") {
ssc = new StreamingContext(master, appName, batchDuration) ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc) val inputStream = addInputStream(ssc)
inputStream.transform(rdd => { throw new TestException("error in transform"); rdd }).register inputStream.transform { rdd => throw new TestException("error in transform"); rdd }.register()
val exception = intercept[TestException] { val exception = intercept[TestException] {
ssc.start() ssc.start()
ssc.awaitTermination(5000) ssc.awaitTermination(5000)
...@@ -265,7 +266,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ...@@ -265,7 +266,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
} }
def addInputStream(s: StreamingContext): DStream[Int] = { def addInputStream(s: StreamingContext): DStream[Int] = {
val input = (1 to 100).map(i => (1 to i)) val input = (1 to 100).map(i => 1 to i)
val inputStream = new TestInputStream(s, input, 1) val inputStream = new TestInputStream(s, input, 1)
inputStream inputStream
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment