diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index fd3a14bd488507303f93811f72596e48d2bca583..4640b5dc2f6542f3fc3ee9a05c679c77f0d52546 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -22,7 +22,6 @@ import java.text.SimpleDateFormat import java.util.Date import scala.collection.immutable.Map -import scala.collection.mutable.ListBuffer import scala.reflect.ClassTag import org.apache.hadoop.conf.{Configurable, Configuration} @@ -317,7 +316,7 @@ class HadoopRDD[K, V]( try { val lsplit = c.inputSplitWithLocationInfo.cast(hsplit) val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]] - Some(HadoopRDD.convertSplitLocationInfo(infos)) + HadoopRDD.convertSplitLocationInfo(infos) } catch { case e: Exception => logDebug("Failed to use InputSplitWithLocations.", e) @@ -419,21 +418,20 @@ private[spark] object HadoopRDD extends Logging { None } - private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Seq[String] = { - val out = ListBuffer[String]() - infos.foreach { loc => - val locationStr = HadoopRDD.SPLIT_INFO_REFLECTIONS.get. - getLocation.invoke(loc).asInstanceOf[String] + private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Option[Seq[String]] = { + Option(infos).map(_.flatMap { loc => + val reflections = HadoopRDD.SPLIT_INFO_REFLECTIONS.get + val locationStr = reflections.getLocation.invoke(loc).asInstanceOf[String] if (locationStr != "localhost") { - if (HadoopRDD.SPLIT_INFO_REFLECTIONS.get.isInMemory. - invoke(loc).asInstanceOf[Boolean]) { - logDebug("Partition " + locationStr + " is cached by Hadoop.") - out += new HDFSCacheTaskLocation(locationStr).toString + if (reflections.isInMemory.invoke(loc).asInstanceOf[Boolean]) { + logDebug(s"Partition $locationStr is cached by Hadoop.") + Some(HDFSCacheTaskLocation(locationStr).toString) } else { - out += new HostTaskLocation(locationStr).toString + Some(HostTaskLocation(locationStr).toString) } + } else { + None } - } - out.seq + }) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index be919e65870a93acd6c2b06b162ef19e0352d563..1c7aec919bdc4d1b194ac5cfaf320627ada17c7f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -255,7 +255,7 @@ class NewHadoopRDD[K, V]( case Some(c) => try { val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]] - Some(HadoopRDD.convertSplitLocationInfo(infos)) + HadoopRDD.convertSplitLocationInfo(infos) } catch { case e : Exception => logDebug("Failed to use InputSplit#getLocationInfo.", e)