Skip to content
Snippets Groups Projects
Commit 8838ad7c authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-4196][SPARK-4602][Streaming] Fix serialization issue in...

[SPARK-4196][SPARK-4602][Streaming] Fix serialization issue in PairDStreamFunctions.saveAsNewAPIHadoopFiles

Solves two JIRAs in one shot
- Makes the ForechDStream created by saveAsNewAPIHadoopFiles serializable for checkpoints
- Makes the default configuration object used saveAsNewAPIHadoopFiles be the Spark's hadoop configuration

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #3457 from tdas/savefiles-fix and squashes the following commits:

bb4729a [Tathagata Das] Same treatment for saveAsHadoopFiles
b382ea9 [Tathagata Das] Fix serialization issue in PairDStreamFunctions.saveAsNewAPIHadoopFiles.
parent bf1a6aaa
No related branches found
No related tags found
No related merge requests found
...@@ -17,20 +17,17 @@ ...@@ -17,20 +17,17 @@
package org.apache.spark.streaming.dstream package org.apache.spark.streaming.dstream
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.{Partitioner, HashPartitioner}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag import scala.reflect.ClassTag
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.spark.streaming.{Time, Duration} import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.spark.{HashPartitioner, Partitioner, SerializableWritable}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Duration, Time}
import org.apache.spark.streaming.StreamingContext._
/** /**
* Extra functions available on DStream of (key, value) pairs through an implicit conversion. * Extra functions available on DStream of (key, value) pairs through an implicit conversion.
...@@ -671,11 +668,13 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) ...@@ -671,11 +668,13 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
keyClass: Class[_], keyClass: Class[_],
valueClass: Class[_], valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]], outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf conf: JobConf = new JobConf(ssc.sparkContext.hadoopConfiguration)
) { ) {
// Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
val serializableConf = new SerializableWritable(conf)
val saveFunc = (rdd: RDD[(K, V)], time: Time) => { val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time) val file = rddToFileName(prefix, suffix, time)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, serializableConf.value)
} }
self.foreachRDD(saveFunc) self.foreachRDD(saveFunc)
} }
...@@ -702,11 +701,14 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) ...@@ -702,11 +701,14 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
keyClass: Class[_], keyClass: Class[_],
valueClass: Class[_], valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]], outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
conf: Configuration = new Configuration conf: Configuration = ssc.sparkContext.hadoopConfiguration
) { ) {
// Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
val serializableConf = new SerializableWritable(conf)
val saveFunc = (rdd: RDD[(K, V)], time: Time) => { val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time) val file = rddToFileName(prefix, suffix, time)
rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) rdd.saveAsNewAPIHadoopFile(
file, keyClass, valueClass, outputFormatClass, serializableConf.value)
} }
self.foreachRDD(saveFunc) self.foreachRDD(saveFunc)
} }
......
...@@ -22,9 +22,14 @@ import java.nio.charset.Charset ...@@ -22,9 +22,14 @@ import java.nio.charset.Charset
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag import scala.reflect.ClassTag
import com.google.common.io.Files import com.google.common.io.Files
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream} import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
import org.apache.spark.streaming.util.ManualClock import org.apache.spark.streaming.util.ManualClock
...@@ -205,6 +210,51 @@ class CheckpointSuite extends TestSuiteBase { ...@@ -205,6 +210,51 @@ class CheckpointSuite extends TestSuiteBase {
testCheckpointedOperation(input, operation, output, 7) testCheckpointedOperation(input, operation, output, 7)
} }
test("recovery with saveAsHadoopFiles operation") {
val tempDir = Files.createTempDir()
try {
testCheckpointedOperation(
Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
(s: DStream[String]) => {
val output = s.map(x => (x, 1)).reduceByKey(_ + _)
output.saveAsHadoopFiles(
tempDir.toURI.toString,
"result",
classOf[Text],
classOf[IntWritable],
classOf[TextOutputFormat[Text, IntWritable]])
output
},
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
3
)
} finally {
Utils.deleteRecursively(tempDir)
}
}
test("recovery with saveAsNewAPIHadoopFiles operation") {
val tempDir = Files.createTempDir()
try {
testCheckpointedOperation(
Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
(s: DStream[String]) => {
val output = s.map(x => (x, 1)).reduceByKey(_ + _)
output.saveAsNewAPIHadoopFiles(
tempDir.toURI.toString,
"result",
classOf[Text],
classOf[IntWritable],
classOf[NewTextOutputFormat[Text, IntWritable]])
output
},
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
3
)
} finally {
Utils.deleteRecursively(tempDir)
}
}
// This tests whether the StateDStream's RDD checkpoints works correctly such // This tests whether the StateDStream's RDD checkpoints works correctly such
// that the system can recover from a master failure. This assumes as reliable, // that the system can recover from a master failure. This assumes as reliable,
...@@ -391,7 +441,9 @@ class CheckpointSuite extends TestSuiteBase { ...@@ -391,7 +441,9 @@ class CheckpointSuite extends TestSuiteBase {
logInfo("Manual clock after advancing = " + clock.time) logInfo("Manual clock after advancing = " + clock.time)
Thread.sleep(batchDuration.milliseconds) Thread.sleep(batchDuration.milliseconds)
val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStreamWithPartitions[V]] val outputStream = ssc.graph.getOutputStreams.filter { dstream =>
dstream.isInstanceOf[TestOutputStreamWithPartitions[V]]
}.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
outputStream.output.map(_.flatten) outputStream.output.map(_.flatten)
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment