Skip to content
Snippets Groups Projects
Commit 1f4718c4 authored by Tathagata Das's avatar Tathagata Das
Browse files

Changed SparkConf to not be serializable. And also fixed unit-test log paths...

Changed SparkConf to not be serializable. And also fixed unit-test log paths in log4j.properties of external modules.
parent f8bd828c
No related branches found
No related tags found
No related merge requests found
Showing
with 66 additions and 23 deletions
......@@ -31,8 +31,7 @@ case class Aggregator[K, V, C] (
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) {
private val sparkConf = SparkEnv.get.conf
private val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
private val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
context: TaskContext) : Iterator[(K, C)] = {
......
......@@ -4,6 +4,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import com.typesafe.config.ConfigFactory
import java.io.{ObjectInputStream, ObjectOutputStream, IOException}
/**
* Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
......@@ -24,7 +25,7 @@ import com.typesafe.config.ConfigFactory
*
* @param loadDefaults whether to load values from the system properties and classpath
*/
class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with Logging {
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)
......
......@@ -66,7 +66,6 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
private type CoGroupValue = (Any, Int) // Int is dependency number
private type CoGroupCombiner = Seq[CoGroup]
private val sparkConf = SparkEnv.get.conf
private var serializerClass: String = null
def setSerializer(cls: String): CoGroupedRDD[K] = {
......@@ -106,7 +105,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
override val partitioner = Some(part)
override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
val sparkConf = SparkEnv.get.conf
val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = split.deps.size
......
......@@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
log4j.appender.file.file=streaming/target/unit-tests.log
log4j.appender.file.file=external/flume/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
......
......@@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
log4j.appender.file.file=streaming/target/unit-tests.log
log4j.appender.file.file=external/kafka/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
......
......@@ -35,5 +35,6 @@ class KafkaStreamSuite extends TestSuiteBase {
ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
// TODO: Actually test receiving data
ssc.stop()
}
}
......@@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
log4j.appender.file.file=streaming/target/unit-tests.log
log4j.appender.file.file=external/mqtt/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
......
......@@ -32,5 +32,6 @@ class MQTTStreamSuite extends TestSuiteBase {
val test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
// TODO: Actually test receiving data
ssc.stop()
}
}
......@@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
log4j.appender.file.file=streaming/target/unit-tests.log
log4j.appender.file.file=external/twitter/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
......
......@@ -39,5 +39,6 @@ class TwitterStreamSuite extends TestSuiteBase {
// Note that actually testing the data receiving is hard as authentication keys are
// necessary for accessing Twitter live stream
ssc.stop()
}
}
......@@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
log4j.appender.file.file=streaming/target/unit-tests.log
log4j.appender.file.file=external/zeromq/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
......
......@@ -40,5 +40,6 @@ class ZeroMQStreamSuite extends TestSuiteBase {
StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy)
// TODO: Actually test data receiving
ssc.stop()
}
}
......@@ -42,11 +42,13 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val checkpointDuration = ssc.checkpointDuration
val pendingTimes = ssc.scheduler.getPendingTimes().toArray
val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
val sparkConf = ssc.conf
val sparkConfPairs = ssc.conf.getAll
// These should be unset when a checkpoint is deserialized,
// otherwise the SparkContext won't initialize correctly.
sparkConf.remove("spark.driver.host").remove("spark.driver.port")
def sparkConf = {
new SparkConf(false).setAll(sparkConfPairs)
.remove("spark.driver.host")
.remove("spark.driver.port")
}
def validate() {
assert(master != null, "Checkpoint.master is null")
......
......@@ -163,8 +163,10 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
logDebug("DStreamGraph.writeObject used")
this.synchronized {
checkpointInProgress = true
logDebug("Enabled checkpoint mode")
oos.defaultWriteObject()
checkpointInProgress = false
logDebug("Disabled checkpoint mode")
}
}
......
......@@ -19,7 +19,7 @@ package org.apache.spark.streaming.dstream
import scala.collection.mutable.HashMap
import scala.reflect.ClassTag
import java.io.{ObjectInputStream, IOException}
import java.io.{ObjectOutputStream, ObjectInputStream, IOException}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.Logging
......@@ -117,8 +117,32 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
"[\n" + currentCheckpointFiles.size + " checkpoint files \n" + currentCheckpointFiles.mkString("\n") + "\n]"
}
@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
logDebug(this.getClass().getSimpleName + ".writeObject used")
if (dstream.context.graph != null) {
dstream.context.graph.synchronized {
if (dstream.context.graph.checkpointInProgress) {
oos.defaultWriteObject()
} else {
val msg = "Object of " + this.getClass.getName + " is being serialized " +
" possibly as a part of closure of an RDD operation. This is because " +
" the DStream object is being referred to from within the closure. " +
" Please rewrite the RDD operation inside this DStream to avoid this. " +
" This has been enforced to avoid bloating of Spark tasks " +
" with unnecessary objects."
throw new java.io.NotSerializableException(msg)
}
}
} else {
throw new java.io.NotSerializableException(
"Graph is unexpectedly null when DStream is being serialized.")
}
}
@throws(classOf[IOException])
private def readObject(ois: ObjectInputStream) {
logDebug(this.getClass().getSimpleName + ".readObject used")
ois.defaultReadObject()
timeToOldestCheckpointFileTime = new HashMap[Time, Time]
timeToCheckpointFile = new HashMap[Time, String]
......
......@@ -151,17 +151,29 @@ class CheckpointSuite extends TestSuiteBase {
val value = "myvalue"
System.setProperty(key, value)
ssc = new StreamingContext(master, framework, batchDuration)
val originalConf = ssc.conf
val cp = new Checkpoint(ssc, Time(1000))
assert(!cp.sparkConf.contains("spark.driver.host"))
assert(!cp.sparkConf.contains("spark.driver.port"))
assert(!cp.sparkConf.contains("spark.hostPort"))
assert(cp.sparkConf.get(key) === value)
val cpConf = cp.sparkConf
assert(cpConf.get("spark.master") === originalConf.get("spark.master"))
assert(cpConf.get("spark.app.name") === originalConf.get("spark.app.name"))
assert(cpConf.get(key) === value)
ssc.stop()
// Serialize/deserialize to simulate write to storage and reading it back
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
assert(!newCp.sparkConf.contains("spark.driver.host"))
assert(!newCp.sparkConf.contains("spark.driver.port"))
assert(!newCp.sparkConf.contains("spark.hostPort"))
assert(newCp.sparkConf.get(key) === value)
val newCpConf = newCp.sparkConf
assert(newCpConf.get("spark.master") === originalConf.get("spark.master"))
assert(newCpConf.get("spark.app.name") === originalConf.get("spark.app.name"))
assert(newCpConf.get(key) === value)
assert(!newCpConf.contains("spark.driver.host"))
assert(!newCpConf.contains("spark.driver.port"))
// Check if all the parameters have been restored
ssc = new StreamingContext(null, newCp, null)
val restoredConf = ssc.conf
assert(restoredConf.get(key) === value)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment