From 3618d70b2a8a66e9a17a7e2efc8c97a22243073a Mon Sep 17 00:00:00 2001 From: Tathagata Das <tathagata.das1565@gmail.com> Date: Thu, 26 Dec 2013 12:45:40 -0800 Subject: [PATCH] Added warning if filestream adds files with no data in them (file RDDs have 0 partitions). --- .../apache/spark/streaming/dstream/FileInputDStream.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 2bb6d91d04..95224282f6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -114,6 +114,13 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas /** Generate one RDD from an array of files */ private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file)) + files.zip(fileRDDs).foreach { case (file, rdd) => { + if (rdd.partitions.size == 0) { + logWarning("File " + file + " has no data in it. Are you sure you are following " + + "the move-based method of adding input files? Refer to the programming guide " + + "for more details.") + } + }} new UnionRDD(context.sparkContext, fileRDDs) } -- GitLab