Skip to content
Snippets Groups Projects
Commit 947f4f25 authored by jerryshao's avatar jerryshao Committed by Shixiong Zhu
Browse files

[SPARK-17999][KAFKA][SQL] Add getPreferredLocations for KafkaSourceRDD

## What changes were proposed in this pull request?

The newly implemented Structured Streaming `KafkaSource` did calculate the preferred locations for each topic partition, but didn't offer this information through RDD's `getPreferredLocations` method. So here propose to add this method in `KafkaSourceRDD`.

## How was this patch tested?

Manual verification.

Author: jerryshao <sshao@hortonworks.com>

Closes #15545 from jerryshao/SPARK-17999.
parent 84b245f2
No related branches found
No related tags found
No related merge requests found
......@@ -112,6 +112,11 @@ private[kafka010] class KafkaSourceRDD(
buf.toArray
}
override def getPreferredLocations(split: Partition): Seq[String] = {
val part = split.asInstanceOf[KafkaSourceRDDPartition]
part.offsetRange.preferredLoc.map(Seq(_)).getOrElse(Seq.empty)
}
override def compute(
thePart: Partition,
context: TaskContext): Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment