From 242b4f02df7f71ebcfa86a85c9ed39e40750a7fd Mon Sep 17 00:00:00 2001 From: freeman <the.freeman.lab@gmail.com> Date: Tue, 3 Feb 2015 22:24:30 -0800 Subject: [PATCH] [SPARK-4969][STREAMING][PYTHON] Add binaryRecords to streaming In Spark 1.2 we added a `binaryRecords` input method for loading flat binary data. This format is useful for numerical array data, e.g. in scientific computing applications. This PR adds support for the same format in Streaming applications, where it is similarly useful, especially for streaming time series or sensor data. Summary of additions - adding `binaryRecordsStream` to Spark Streaming - exposing `binaryRecordsStream` in the new PySpark Streaming - new unit tests in Scala and Python This required adding an optional Hadoop configuration param to `fileStream` and `FileInputStream`, but was otherwise straightforward. tdas davies Author: freeman <the.freeman.lab@gmail.com> Closes #3803 from freeman-lab/streaming-binary-records and squashes the following commits: b676534 [freeman] Clarify note 5ff1b75 [freeman] Add note to java streaming context eba925c [freeman] Simplify notes c4237b8 [freeman] Add experimental tag 30eba67 [freeman] Add filter and newFilesOnly alongside conf c2cfa6d [freeman] Expose new version of fileStream with conf in java 34d20ef [freeman] Add experimental tag 14bca9a [freeman] Add experimental tag b85bffc [freeman] Formatting 47560f4 [freeman] Space formatting 9a3715a [freeman] Refactor to reflect changes to FileInputSuite 7373f73 [freeman] Add note and defensive assertion for byte length 3ceb684 [freeman] Merge remote-tracking branch 'upstream/master' into streaming-binary-records 317b6d1 [freeman] Make test inline fcb915c [freeman] Formatting becb344 [freeman] Formatting d3e75b2 [freeman] Add tests in python a4324a3 [freeman] Line length 029d49c [freeman] Formatting 1c739aa [freeman] Simpler default arg handling 94d90d0 [freeman] Spelling 2843e9d [freeman] Add params to docstring 8b70fbc [freeman] Reorganization 28bff9b [freeman] Fix missing arg 9398bcb [freeman] Expose optional hadoop configuration 23dd69f [freeman] Tests for binaryRecordsStream 36cb0fd [freeman] Add binaryRecordsStream to scala fe4e803 [freeman] Add binaryRecordStream to Java API ecef0eb [freeman] Add binaryRecordsStream to python 8550c26 [freeman] Expose additional argument combination --- .../scala/org/apache/spark/SparkContext.scala | 9 ++- python/pyspark/streaming/context.py | 16 ++++- python/pyspark/streaming/tests.py | 15 +++++ .../spark/streaming/StreamingContext.scala | 59 ++++++++++++++++++- .../api/java/JavaStreamingContext.scala | 52 +++++++++++++++- .../streaming/dstream/FileInputDStream.scala | 17 +++++- .../spark/streaming/InputStreamsSuite.scala | 51 ++++++++++++++++ 7 files changed, 212 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 16c6fdbe52..eecfdd4222 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -657,6 +657,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * * Load data from a flat binary file, assuming the length of each record is constant. * + * '''Note:''' We ensure that the byte array for each record in the resulting RDD + * has the provided record length. + * * @param path Directory to the input data files * @param recordLength The length at which to split the records * @return An RDD of data with values, represented as byte arrays @@ -671,7 +674,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli classOf[LongWritable], classOf[BytesWritable], conf=conf) - val data = br.map{ case (k, v) => v.getBytes} + val data = br.map { case (k, v) => + val bytes = v.getBytes + assert(bytes.length == recordLength, "Byte array does not have correct length") + bytes + } data } diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index d48f3598e3..18aaae93b0 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -21,7 +21,7 @@ from py4j.java_collections import ListConverter from py4j.java_gateway import java_import, JavaObject from pyspark import RDD, SparkConf -from pyspark.serializers import UTF8Deserializer, CloudPickleSerializer +from pyspark.serializers import NoOpSerializer, UTF8Deserializer, CloudPickleSerializer from pyspark.context import SparkContext from pyspark.storagelevel import StorageLevel from pyspark.streaming.dstream import DStream @@ -251,6 +251,20 @@ class StreamingContext(object): """ return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer()) + def binaryRecordsStream(self, directory, recordLength): + """ + Create an input stream that monitors a Hadoop-compatible file system + for new files and reads them as flat binary files with records of + fixed length. Files must be written to the monitored directory by "moving" + them from another location within the same file system. + File names starting with . are ignored. + + @param directory: Directory to load data from + @param recordLength: Length of each record in bytes + """ + return DStream(self._jssc.binaryRecordsStream(directory, recordLength), self, + NoOpSerializer()) + def _check_serializers(self, rdds): # make sure they have same serializer if len(set(rdd._jrdd_deserializer for rdd in rdds)) > 1: diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index a8d876d0fa..608f8e2647 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -21,6 +21,7 @@ import time import operator import unittest import tempfile +import struct from pyspark.context import SparkConf, SparkContext, RDD from pyspark.streaming.context import StreamingContext @@ -455,6 +456,20 @@ class StreamingContextTests(PySparkStreamingTestCase): self.wait_for(result, 2) self.assertEqual([range(10), range(10)], result) + def test_binary_records_stream(self): + d = tempfile.mkdtemp() + self.ssc = StreamingContext(self.sc, self.duration) + dstream = self.ssc.binaryRecordsStream(d, 10).map( + lambda v: struct.unpack("10b", str(v))) + result = self._collect(dstream, 2, block=False) + self.ssc.start() + for name in ('a', 'b'): + time.sleep(1) + with open(os.path.join(d, name), "wb") as f: + f.write(bytearray(range(10))) + self.wait_for(result, 2) + self.assertEqual([range(10), range(10)], map(lambda v: list(v[0]), result)) + def test_union(self): input = [range(i + 1) for i in range(3)] dstream = self.ssc.queueStream(input) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 8ef0787137..ddc435cf1a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -27,10 +27,12 @@ import scala.reflect.ClassTag import akka.actor.{Props, SupervisorStrategy} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.spark._ +import org.apache.spark.annotation.Experimental +import org.apache.spark.input.FixedLengthBinaryInputFormat import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream._ @@ -359,6 +361,30 @@ class StreamingContext private[streaming] ( new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) } + /** + * Create a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. + * @param directory HDFS directory to monitor for new file + * @param filter Function to filter paths to process + * @param newFilesOnly Should process only new files and ignore existing files in the directory + * @param conf Hadoop configuration + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[ + K: ClassTag, + V: ClassTag, + F <: NewInputFormat[K, V]: ClassTag + ] (directory: String, + filter: Path => Boolean, + newFilesOnly: Boolean, + conf: Configuration): InputDStream[(K, V)] = { + new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, Option(conf)) + } + /** * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them as text files (using key as LongWritable, value @@ -371,6 +397,37 @@ class StreamingContext private[streaming] ( fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString) } + /** + * :: Experimental :: + * + * Create an input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them as flat binary files, assuming a fixed length per record, + * generating one byte array per record. Files must be written to the monitored directory + * by "moving" them from another location within the same file system. File names + * starting with . are ignored. + * + * '''Note:''' We ensure that the byte array for each record in the + * resulting RDDs of the DStream has the provided record length. + * + * @param directory HDFS directory to monitor for new file + * @param recordLength length of each record in bytes + */ + @Experimental + def binaryRecordsStream( + directory: String, + recordLength: Int): DStream[Array[Byte]] = { + val conf = sc_.hadoopConfiguration + conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength) + val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat]( + directory, FileInputDStream.defaultFilter : Path => Boolean, newFilesOnly=true, conf) + val data = br.map { case (k, v) => + val bytes = v.getBytes + assert(bytes.length == recordLength, "Byte array does not have correct length") + bytes + } + data + } + /** * Create an input stream from a queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 9a2254bcdc..0f7ae7a1c7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} import org.apache.spark.rdd.RDD @@ -177,7 +178,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { /** * Create an input stream from network source hostname:port. Data is received using - * a TCP socket and the receive bytes it interepreted as object using the given + * a TCP socket and the receive bytes it interpreted as object using the given * converter. * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data @@ -209,6 +210,24 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { ssc.textFileStream(directory) } + /** + * :: Experimental :: + * + * Create an input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them as flat binary files with fixed record lengths, + * yielding byte arrays + * + * '''Note:''' We ensure that the byte array for each record in the + * resulting RDDs of the DStream has the provided record length. + * + * @param directory HDFS directory to monitor for new files + * @param recordLength The length at which to split the records + */ + @Experimental + def binaryRecordsStream(directory: String, recordLength: Int): JavaDStream[Array[Byte]] = { + ssc.binaryRecordsStream(directory, recordLength) + } + /** * Create an input stream from network source hostname:port, where data is received * as serialized blocks (serialized using the Spark's serializer) that can be directly @@ -298,6 +317,37 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { ssc.fileStream[K, V, F](directory, fn, newFilesOnly) } + /** + * Create an input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. + * @param directory HDFS directory to monitor for new file + * @param kClass class of key for reading HDFS file + * @param vClass class of value for reading HDFS file + * @param fClass class of input format for reading HDFS file + * @param filter Function to filter paths to process + * @param newFilesOnly Should process only new files and ignore existing files in the directory + * @param conf Hadoop configuration + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[K, V, F <: NewInputFormat[K, V]]( + directory: String, + kClass: Class[K], + vClass: Class[V], + fClass: Class[F], + filter: JFunction[Path, JBoolean], + newFilesOnly: Boolean, + conf: Configuration): JavaPairInputDStream[K, V] = { + implicit val cmk: ClassTag[K] = ClassTag(kClass) + implicit val cmv: ClassTag[V] = ClassTag(vClass) + implicit val cmf: ClassTag[F] = ClassTag(fClass) + def fn = (x: Path) => filter.call(x).booleanValue() + ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf) + } + /** * Create an input stream with any arbitrary user implemented actor receiver. * @param props Props object defining creation of the actor diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index e7c5639a63..6379b88527 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable import scala.reflect.ClassTag +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} @@ -68,11 +69,13 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils} * processing semantics are undefined. */ private[streaming] -class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag]( +class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( @transient ssc_ : StreamingContext, directory: String, filter: Path => Boolean = FileInputDStream.defaultFilter, - newFilesOnly: Boolean = true) + newFilesOnly: Boolean = true, + conf: Option[Configuration] = None) + (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]) extends InputDStream[(K, V)](ssc_) { // This is a def so that it works during checkpoint recovery: @@ -237,7 +240,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas /** Generate one RDD from an array of files */ private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { val fileRDDs = files.map(file =>{ - val rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file) + val rdd = conf match { + case Some(config) => context.sparkContext.newAPIHadoopFile( + file, + fm.runtimeClass.asInstanceOf[Class[F]], + km.runtimeClass.asInstanceOf[Class[K]], + vm.runtimeClass.asInstanceOf[Class[V]], + config) + case None => context.sparkContext.newAPIHadoopFile[K, V, F](file) + } if (rdd.partitions.size == 0) { logError("File " + file + " has no data in it. Spark Streaming can only ingest " + "files that have been \"moved\" to the directory assigned to the file stream. " + diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index bddf51e130..01084a457d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -95,6 +95,57 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } } + test("binary records stream") { + val testDir: File = null + try { + val batchDuration = Seconds(2) + val testDir = Utils.createTempDir() + // Create a file that exists before the StreamingContext is created: + val existingFile = new File(testDir, "0") + Files.write("0\n", existingFile, Charset.forName("UTF-8")) + assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000) + + // Set up the streaming context and input streams + withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + // This `setTime` call ensures that the clock is past the creation time of `existingFile` + clock.setTime(existingFile.lastModified + batchDuration.milliseconds) + val batchCounter = new BatchCounter(ssc) + val fileStream = ssc.binaryRecordsStream(testDir.toString, 1) + val outputBuffer = new ArrayBuffer[Seq[Array[Byte]]] + with SynchronizedBuffer[Seq[Array[Byte]]] + val outputStream = new TestOutputStream(fileStream, outputBuffer) + outputStream.register() + ssc.start() + + // Advance the clock so that the files are created after StreamingContext starts, but + // not enough to trigger a batch + clock.addToTime(batchDuration.milliseconds / 2) + + val input = Seq(1, 2, 3, 4, 5) + input.foreach { i => + Thread.sleep(batchDuration.milliseconds) + val file = new File(testDir, i.toString) + Files.write(Array[Byte](i.toByte), file) + assert(file.setLastModified(clock.currentTime())) + assert(file.lastModified === clock.currentTime) + logInfo("Created file " + file) + // Advance the clock after creating the file to avoid a race when + // setting its modification time + clock.addToTime(batchDuration.milliseconds) + eventually(eventuallyTimeout) { + assert(batchCounter.getNumCompletedBatches === i) + } + } + + val expectedOutput = input.map(i => i.toByte) + val obtainedOutput = outputBuffer.flatten.toList.map(i => i(0).toByte) + assert(obtainedOutput === expectedOutput) + } + } finally { + if (testDir != null) Utils.deleteRecursively(testDir) + } + } test("file input stream - newFilesOnly = true") { testFileStream(newFilesOnly = true) -- GitLab