Skip to content
Snippets Groups Projects
Commit 2bcd5d5c authored by Sean Owen's avatar Sean Owen
Browse files

[SPARK-17193][CORE] HadoopRDD NPE at DEBUG log level when getLocationInfo == null

## What changes were proposed in this pull request?

Handle null from Hadoop getLocationInfo directly instead of catching (and logging) exception

## How was this patch tested?

Jenkins tests

Author: Sean Owen <sowen@cloudera.com>

Closes #14760 from srowen/SPARK-17193.
parent 5f02d2e5
No related branches found
No related tags found
No related merge requests found
......@@ -22,7 +22,6 @@ import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.immutable.Map
import scala.collection.mutable.ListBuffer
import scala.reflect.ClassTag
import org.apache.hadoop.conf.{Configurable, Configuration}
......@@ -317,7 +316,7 @@ class HadoopRDD[K, V](
try {
val lsplit = c.inputSplitWithLocationInfo.cast(hsplit)
val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]]
Some(HadoopRDD.convertSplitLocationInfo(infos))
HadoopRDD.convertSplitLocationInfo(infos)
} catch {
case e: Exception =>
logDebug("Failed to use InputSplitWithLocations.", e)
......@@ -419,21 +418,20 @@ private[spark] object HadoopRDD extends Logging {
None
}
private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Seq[String] = {
val out = ListBuffer[String]()
infos.foreach { loc =>
val locationStr = HadoopRDD.SPLIT_INFO_REFLECTIONS.get.
getLocation.invoke(loc).asInstanceOf[String]
private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Option[Seq[String]] = {
Option(infos).map(_.flatMap { loc =>
val reflections = HadoopRDD.SPLIT_INFO_REFLECTIONS.get
val locationStr = reflections.getLocation.invoke(loc).asInstanceOf[String]
if (locationStr != "localhost") {
if (HadoopRDD.SPLIT_INFO_REFLECTIONS.get.isInMemory.
invoke(loc).asInstanceOf[Boolean]) {
logDebug("Partition " + locationStr + " is cached by Hadoop.")
out += new HDFSCacheTaskLocation(locationStr).toString
if (reflections.isInMemory.invoke(loc).asInstanceOf[Boolean]) {
logDebug(s"Partition $locationStr is cached by Hadoop.")
Some(HDFSCacheTaskLocation(locationStr).toString)
} else {
out += new HostTaskLocation(locationStr).toString
Some(HostTaskLocation(locationStr).toString)
}
} else {
None
}
}
out.seq
})
}
}
......@@ -255,7 +255,7 @@ class NewHadoopRDD[K, V](
case Some(c) =>
try {
val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
Some(HadoopRDD.convertSplitLocationInfo(infos))
HadoopRDD.convertSplitLocationInfo(infos)
} catch {
case e : Exception =>
logDebug("Failed to use InputSplit#getLocationInfo.", e)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment