Skip to content
Snippets Groups Projects
  • jerryshao's avatar
    3ccebf36
    [SPARK-8389] [STREAMING] [PYSPARK] Expose KafkaRDDs offsetRange in Python · 3ccebf36
    jerryshao authored
    This PR propose a simple way to expose OffsetRange in Python code, also the usage of offsetRanges is similar to Scala/Java way, here in Python we could get OffsetRange like:
    
    ```
    dstream.foreachRDD(lambda r: KafkaUtils.offsetRanges(r))
    ```
    
    Reason I didn't follow the way what SPARK-8389 suggested is that: Python Kafka API has one more step to decode the message compared to Scala/Java, Which makes Python API return a transformed RDD/DStream, not directly wrapped so-called JavaKafkaRDD, so it is hard to backtrack to the original RDD to get the offsetRange.
    
    Author: jerryshao <saisai.shao@intel.com>
    
    Closes #7185 from jerryshao/SPARK-8389 and squashes the following commits:
    
    4c6d320 [jerryshao] Another way to fix subclass deserialization issue
    e6a8011 [jerryshao] Address the comments
    fd13937 [jerryshao] Fix serialization bug
    7debf1c [jerryshao] bug fix
    cff3893 [jerryshao] refactor the code according to the comments
    2aabf9e [jerryshao] Style fix
    848c708 [jerryshao] Add HasOffsetRanges for Python
    3ccebf36
    History
    [SPARK-8389] [STREAMING] [PYSPARK] Expose KafkaRDDs offsetRange in Python
    jerryshao authored
    This PR propose a simple way to expose OffsetRange in Python code, also the usage of offsetRanges is similar to Scala/Java way, here in Python we could get OffsetRange like:
    
    ```
    dstream.foreachRDD(lambda r: KafkaUtils.offsetRanges(r))
    ```
    
    Reason I didn't follow the way what SPARK-8389 suggested is that: Python Kafka API has one more step to decode the message compared to Scala/Java, Which makes Python API return a transformed RDD/DStream, not directly wrapped so-called JavaKafkaRDD, so it is hard to backtrack to the original RDD to get the offsetRange.
    
    Author: jerryshao <saisai.shao@intel.com>
    
    Closes #7185 from jerryshao/SPARK-8389 and squashes the following commits:
    
    4c6d320 [jerryshao] Another way to fix subclass deserialization issue
    e6a8011 [jerryshao] Address the comments
    fd13937 [jerryshao] Fix serialization bug
    7debf1c [jerryshao] bug fix
    cff3893 [jerryshao] refactor the code according to the comments
    2aabf9e [jerryshao] Style fix
    848c708 [jerryshao] Add HasOffsetRanges for Python