From 947f4f25273161dc4719419a35613a71c2e2a150 Mon Sep 17 00:00:00 2001
From: jerryshao <sshao@hortonworks.com>
Date: Thu, 20 Oct 2016 10:50:34 -0700
Subject: [PATCH] [SPARK-17999][KAFKA][SQL] Add getPreferredLocations for
 KafkaSourceRDD

## What changes were proposed in this pull request?

The newly implemented Structured Streaming `KafkaSource` did calculate the preferred locations for each topic partition, but didn't offer this information through RDD's `getPreferredLocations` method. So here propose to add this method in `KafkaSourceRDD`.

## How was this patch tested?

Manual verification.

Author: jerryshao <sshao@hortonworks.com>

Closes #15545 from jerryshao/SPARK-17999.
---
 .../scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
index 496af7e39a..802dd040ae 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
@@ -112,6 +112,11 @@ private[kafka010] class KafkaSourceRDD(
     buf.toArray
   }
 
+  override def getPreferredLocations(split: Partition): Seq[String] = {
+    val part = split.asInstanceOf[KafkaSourceRDDPartition]
+    part.offsetRange.preferredLoc.map(Seq(_)).getOrElse(Seq.empty)
+  }
+
   override def compute(
       thePart: Partition,
       context: TaskContext): Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = {
-- 
GitLab