Skip to content
Snippets Groups Projects
  • Shixiong Zhu's avatar
    9293734d
    [SPARK-17346][SQL] Add Kafka source for Structured Streaming · 9293734d
    Shixiong Zhu authored
    ## What changes were proposed in this pull request?
    
    This PR adds a new project ` external/kafka-0-10-sql` for Structured Streaming Kafka source.
    
    It's based on the design doc: https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing
    
    tdas did most of work and part of them was inspired by koeninger's work.
    
    ### Introduction
    
    The Kafka source is a structured streaming data source to poll data from Kafka. The schema of reading data is as follows:
    
    Column | Type
    ---- | ----
    key | binary
    value | binary
    topic | string
    partition | int
    offset | long
    timestamp | long
    timestampType | int
    
    The source can deal with deleting topics. However, the user should make sure there is no Spark job processing the data when deleting a topic.
    
    ### Configuration
    
    The user can use `DataStreamReader.option` to set the following configurations.
    
    Kafka Source's options | value | default | meaning
    ------ | ------- | ------ | -----
    startingOffset | ["earliest", "latest"] | "latest" | The start point when a query is started, either "earliest" which is from the earliest offset, or "latest" which is just from the latest offset. Note: This only applies when a new Streaming query is started, and that resuming will always pick up from where the query left off.
    failOnDataLost | [true, false] | true | Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected.
    subscribe | A comma-separated list of topics | (none) | The topic list to subscribe. Only one of "subscribe" and "subscribeParttern" options can be specified for Kafka source.
    subscribePattern | Java regex string | (none) | The pattern used to subscribe the topic. Only one of "subscribe" and "subscribeParttern" options can be specified for Kafka source.
    kafka.consumer.poll.timeoutMs | long | 512 | The timeout in milliseconds to poll data from Kafka in executors
    fetchOffset.numRetries | int | 3 | Number of times to retry before giving up fatch Kafka latest offsets.
    fetchOffset.retryIntervalMs | long | 10 | milliseconds to wait before retrying to fetch Kafka offsets
    
    Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g, `stream.option("kafka.bootstrap.servers", "host:port")`
    
    ### Usage
    
    * Subscribe to 1 topic
    ```Scala
    spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host:port")
      .option("subscribe", "topic1")
      .load()
    ```
    
    * Subscribe to multiple topics
    ```Scala
    spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host:port")
      .option("subscribe", "topic1,topic2")
      .load()
    ```
    
    * Subscribe to a pattern
    ```Scala
    spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host:port")
      .option("subscribePattern", "topic.*")
      .load()
    ```
    
    ## How was this patch tested?
    
    The new unit tests.
    
    Author: Shixiong Zhu <shixiong@databricks.com>
    Author: Tathagata Das <tathagata.das1565@gmail.com>
    Author: Shixiong Zhu <zsxwing@gmail.com>
    Author: cody koeninger <cody@koeninger.org>
    
    Closes #15102 from zsxwing/kafka-source.
    9293734d
    History
    [SPARK-17346][SQL] Add Kafka source for Structured Streaming
    Shixiong Zhu authored
    ## What changes were proposed in this pull request?
    
    This PR adds a new project ` external/kafka-0-10-sql` for Structured Streaming Kafka source.
    
    It's based on the design doc: https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing
    
    tdas did most of work and part of them was inspired by koeninger's work.
    
    ### Introduction
    
    The Kafka source is a structured streaming data source to poll data from Kafka. The schema of reading data is as follows:
    
    Column | Type
    ---- | ----
    key | binary
    value | binary
    topic | string
    partition | int
    offset | long
    timestamp | long
    timestampType | int
    
    The source can deal with deleting topics. However, the user should make sure there is no Spark job processing the data when deleting a topic.
    
    ### Configuration
    
    The user can use `DataStreamReader.option` to set the following configurations.
    
    Kafka Source's options | value | default | meaning
    ------ | ------- | ------ | -----
    startingOffset | ["earliest", "latest"] | "latest" | The start point when a query is started, either "earliest" which is from the earliest offset, or "latest" which is just from the latest offset. Note: This only applies when a new Streaming query is started, and that resuming will always pick up from where the query left off.
    failOnDataLost | [true, false] | true | Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected.
    subscribe | A comma-separated list of topics | (none) | The topic list to subscribe. Only one of "subscribe" and "subscribeParttern" options can be specified for Kafka source.
    subscribePattern | Java regex string | (none) | The pattern used to subscribe the topic. Only one of "subscribe" and "subscribeParttern" options can be specified for Kafka source.
    kafka.consumer.poll.timeoutMs | long | 512 | The timeout in milliseconds to poll data from Kafka in executors
    fetchOffset.numRetries | int | 3 | Number of times to retry before giving up fatch Kafka latest offsets.
    fetchOffset.retryIntervalMs | long | 10 | milliseconds to wait before retrying to fetch Kafka offsets
    
    Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g, `stream.option("kafka.bootstrap.servers", "host:port")`
    
    ### Usage
    
    * Subscribe to 1 topic
    ```Scala
    spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host:port")
      .option("subscribe", "topic1")
      .load()
    ```
    
    * Subscribe to multiple topics
    ```Scala
    spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host:port")
      .option("subscribe", "topic1,topic2")
      .load()
    ```
    
    * Subscribe to a pattern
    ```Scala
    spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host:port")
      .option("subscribePattern", "topic.*")
      .load()
    ```
    
    ## How was this patch tested?
    
    The new unit tests.
    
    Author: Shixiong Zhu <shixiong@databricks.com>
    Author: Tathagata Das <tathagata.das1565@gmail.com>
    Author: Shixiong Zhu <zsxwing@gmail.com>
    Author: cody koeninger <cody@koeninger.org>
    
    Closes #15102 from zsxwing/kafka-source.
run-tests.py 21.91 KiB