  1. May 19, 2017
  2. May 05, 2017
  3. Apr 28, 2017
  4. Apr 25, 2017
  5. Apr 14, 2017
  6. Mar 28, 2017
  7. Mar 21, 2017
  8. Mar 17, 2017
    • Liwei Lin's avatar
      [SPARK-19721][SS][BRANCH-2.1] Good error message for version mismatch in log files · 710b5554
      Liwei Lin authored
      ## Problem
      There are several places where we write out version identifiers in various logs for structured streaming (usually `v1`). However, in the places where we check for this, we throw a confusing error message.
      ## What changes were proposed in this pull request?
      This patch made two major changes:
      1. added a `parseVersion(...)` method, and based on this method, fixed the following places the way they did version checking (no other place needed to do this checking):
        - CompactibleFileStreamLog  ------------> fixed with this patch
          - FileStreamSourceLog  ---------------> inherited the fix of `CompactibleFileStreamLog`
          - FileStreamSinkLog  -----------------> inherited the fix of `CompactibleFileStreamLog`
        - OffsetSeqLog  ------------------------> fixed with this patch
        - anonymous subclass in KafkaSource  ---> fixed with this patch
      2. changed the type of `FileStreamSinkLog.VERSION`, `FileStreamSourceLog.VERSION` etc. from `String` to `Int`, so that we can identify newer versions via `version > 1` instead of `version != "v1"`
          - note this didn't break any backwards compatibility -- we are still writing out `"v1"` and reading back `"v1"`
      ## Exception message with this patch
      java.lang.IllegalStateException: Failed to read log file /private/var/folders/nn/82rmvkk568sd8p3p8tb33trw0000gn/T/spark-86867b65-0069-4ef1-b0eb-d8bd258ff5b8/0. UnsupportedLogVersion: maximum supported log version is v1, but encountered v99. The log file was produced by a newer version of Spark and cannot be read by this version. Please upgrade.
      	at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:202)
      	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:78)
      	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:75)
      	at org.apache.spark.sql.test.SQLTestUtils$class.withTempDir(SQLTestUtils.scala:133)
      	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite.withTempDir(OffsetSeqLogSuite.scala:26)
      	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply$mcV$sp(OffsetSeqLogSuite.scala:75)
      	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75)
      	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75)
      	at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
      	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
      ## How was this patch tested?
      unit tests
      Author: Liwei Lin <>
      Closes #17327 from lw-lin/good-msg-2.1.
  9. Mar 12, 2017
    • uncleGen's avatar
      [SPARK-19853][SS] uppercase kafka topics fail when startingOffsets are SpecificOffsets · 8c460804
      uncleGen authored
      When using the KafkaSource with Structured Streaming, consumer assignments are not what the user expects if startingOffsets is set to an explicit set of topics/partitions in JSON where the topic(s) happen to have uppercase characters. When StartingOffsets is constructed, the original string value from options is transformed toLowerCase to make matching on "earliest" and "latest" case insensitive. However, the toLowerCase JSON is passed to SpecificOffsets for the terminal condition, so topic names may not be what the user intended by the time assignments are made with the underlying KafkaConsumer.
      val startingOffsets = caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match {
          case Some("latest") => LatestOffsets
          case Some("earliest") => EarliestOffsets
          case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json))
          case None => LatestOffsets
      Thank cbowden for reporting.
      Author: uncleGen <>
      Closes #17209 from uncleGen/SPARK-19853.
      (cherry picked from commit 0a4d06a7)
      Signed-off-by: default avatarShixiong Zhu <>
  10. Mar 09, 2017
  11. Mar 06, 2017
    • Tyson Condie's avatar
      [SPARK-19719][SS] Kafka writer for both structured streaming and batch queires · fd6c6d5c
      Tyson Condie authored
      ## What changes were proposed in this pull request?
      Add a new Kafka Sink and Kafka Relation for writing streaming and batch queries, respectively, to Apache Kafka.
      ### Streaming Kafka Sink
      - When addBatch is called
      -- If batchId is great than the last written batch
      --- Write batch to Kafka
      ---- Topic will be taken from the record, if present, or from a topic option, which overrides topic in record.
      -- Else ignore
      ### Batch Kafka Sink
      - KafkaSourceProvider will implement CreatableRelationProvider
      - CreatableRelationProvider#createRelation will write the passed in Dataframe to a Kafka
      - Topic will be taken from the record, if present, or from topic option, which overrides topic in record.
      - Save modes Append and ErrorIfExist supported under identical semantics. Other save modes result in an AnalysisException
      tdas zsxwing
      ## How was this patch tested?
      ### The following unit tests will be included
      - write to stream with topic field: valid stream write with data that includes an existing topic in the schema
      - write structured streaming aggregation w/o topic field, with default topic: valid stream write with data that does not include a topic field, but the configuration includes a default topic
      - write data with bad schema: various cases of writing data that does not conform to a proper schema e.g., 1. no topic field or default topic, and 2. no value field
      - write data with valid schema but wrong types: data with a complete schema but wrong types e.g., key and value types are integers.
      - write to non-existing topic: write a stream to a topic that does not exist in Kafka, which has been configured to not auto-create topics.
      - write batch to kafka: simple write batch to Kafka, which goes through the same code path as streaming scenario, so validity checks will not be redone here.
      ### Examples
      // Structured Streaming
      val writer = => s.get(0).toString.getBytes()).toDF("value")
       .selectExpr("value as key", "value as value")
       .option("checkpointLocation", checkpointDir)
       .option("kafka.bootstrap.servers", brokerAddress)
       .option("topic", topic)
      // Batch
      val df = spark
       .parallelize(Seq("1", "2", "3", "4", "5"))
       .map(v => (topic, v))
       .toDF("topic", "value")
       .option("topic", topic)
      Please review before opening a pull request.
      Author: Tyson Condie <>
      Closes #17043 from tcondie/kafka-writer.
  12. Feb 17, 2017
    • Roberto Agostino Vitillo's avatar
      [SPARK-19517][SS] KafkaSource fails to initialize partition offsets · b083ec51
      Roberto Agostino Vitillo authored
      ## What changes were proposed in this pull request?
      This patch fixes a bug in `KafkaSource` with the (de)serialization of the length of the JSON string that contains the initial partition offsets.
      ## How was this patch tested?
      I ran the test suite for spark-sql-kafka-0-10.
      Author: Roberto Agostino Vitillo <>
      Closes #16857 from vitillo/kafka_source_fix.
  13. Feb 13, 2017
    • Liwei Lin's avatar
      [SPARK-19564][SPARK-19559][SS][KAFKA] KafkaOffsetReader's consumers should not be in the same group · fe4fcc57
      Liwei Lin authored
      ## What changes were proposed in this pull request?
      In `KafkaOffsetReader`, when error occurs, we abort the existing consumer and create a new consumer. In our current implementation, the first consumer and the second consumer would be in the same group (which leads to SPARK-19559), **_violating our intention of the two consumers not being in the same group._**
      The cause is that, in our current implementation, the first consumer is created before `groupId` and `nextId` are initialized in the constructor. Then even if `groupId` and `nextId` are increased during the creation of that first consumer, `groupId` and `nextId` would still be initialized to default values in the constructor for the second consumer.
      We should make sure that `groupId` and `nextId` are initialized before any consumer is created.
      ## How was this patch tested?
      Ran 100 times of `KafkaSourceSuite`; all passed
      Author: Liwei Lin <>
      Closes #16902 from lw-lin/SPARK-19564-.
      (cherry picked from commit 2bdbc870)
      Signed-off-by: default avatarShixiong Zhu <>
  14. Feb 07, 2017
    • Tyson Condie's avatar
      [SPARK-18682][SS] Batch Source for Kafka · e642a07d
      Tyson Condie authored
      Today, you can start a stream that reads from kafka. However, given kafka's configurable retention period, it seems like sometimes you might just want to read all of the data that is available now. As such we should add a version that works with as well.
      The options should be the same as the streaming kafka source, with the following differences:
      startingOffsets should default to earliest, and should not allow latest (which would always be empty).
      endingOffsets should also be allowed and should default to latest. the same assign json format as startingOffsets should also be accepted.
      It would be really good, if things like .limit(n) were enough to prevent all the data from being read (this might just work).
      KafkaRelationSuite was added for testing batch queries via KafkaUtils.
      Author: Tyson Condie <>
      Closes #16686 from tcondie/SPARK-18682.
      (cherry picked from commit 8df44440)
      Signed-off-by: default avatarShixiong Zhu <>
  15. Dec 22, 2016
  16. Dec 21, 2016
    • Shixiong Zhu's avatar
      [SPARK-18588][SS][KAFKA] Create a new KafkaConsumer when error happens to fix the flaky test · 17ef57fe
      Shixiong Zhu authored
      ## What changes were proposed in this pull request?
      When KafkaSource fails on Kafka errors, we should create a new consumer to retry rather than using the existing broken one because it's possible that the broken one will fail again.
      This PR also assigns a new group id to the new created consumer for a possible race condition:  the broken consumer cannot talk with the Kafka cluster in `close` but the new consumer can talk to Kafka cluster. I'm not sure if this will happen or not. Just for safety to avoid that the Kafka cluster thinks there are two consumers with the same group id in a short time window. (Note: CachedKafkaConsumer doesn't need this fix since `assign` never uses the group id.)
      ## How was this patch tested?
       , it ran this flaky test 120 times and all passed.
      Author: Shixiong Zhu <>
      Closes #16282 from zsxwing/kafka-fix.
      (cherry picked from commit 95efc895)
      Signed-off-by: default avatarTathagata Das <>
  17. Dec 15, 2016
  18. Dec 13, 2016
  19. Dec 08, 2016
    • Tathagata Das's avatar
      [SPARK-18776][SS] Make Offset for FileStreamSource corrected formatted in json · fcd22e53
      Tathagata Das authored
      ## What changes were proposed in this pull request?
      - Changed FileStreamSource to use new FileStreamSourceOffset rather than LongOffset. The field is named as `logOffset` to make it more clear that this is a offset in the file stream log.
      - Fixed bug in FileStreamSourceLog, the field endId in the FileStreamSourceLog.get(startId, endId) was not being used at all. No test caught it earlier. Only my updated tests caught it.
      Other minor changes
      - Dont use batchId in the FileStreamSource, as calling it batch id is extremely miss leading. With multiple sources, it may happen that a new batch has no new data from a file source. So offset of FileStreamSource != batchId after that batch.
      ## How was this patch tested?
      Updated unit test.
      Author: Tathagata Das <>
      Closes #16205 from tdas/SPARK-18776.
      (cherry picked from commit 458fa332)
      Signed-off-by: default avatarTathagata Das <>
    • Patrick Wendell's avatar
    • Patrick Wendell's avatar
      Preparing Spark release v2.1.0-rc2 · 08071749
      Patrick Wendell authored
  20. Dec 07, 2016
    • Michael Armbrust's avatar
      [SPARK-18754][SS] Rename recentProgresses to recentProgress · 1c641971
      Michael Armbrust authored
      Based on an informal survey, users find this option easier to understand / remember.
      Author: Michael Armbrust <>
      Closes #16182 from marmbrus/renameRecentProgress.
      (cherry picked from commit 70b2bf71)
      Signed-off-by: default avatarTathagata Das <>
    • Shixiong Zhu's avatar
      [SPARK-18588][TESTS] Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite · e9b3afac
      Shixiong Zhu authored
      ## What changes were proposed in this pull request?
      Fixed the following failures:
      org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 3745 times over 1.0000790851666665 minutes. Last failure message: assertion failed: failOnDataLoss-0 not deleted after timeout.
      sbt.ForkMain$ForkError: org.apache.spark.sql.streaming.StreamingQueryException: Query query-66 terminated with exception: null
      	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$
      Caused by: sbt.ForkMain$ForkError: java.lang.NullPointerException: null
      	at java.util.ArrayList.addAll(
      	at org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(
      	at org.apache.kafka.clients.Metadata.update(
      	at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleResponse(
      	at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeHandleCompletedReceive(
      	at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(
      	at org.apache.kafka.clients.NetworkClient.poll(
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(
      	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
      	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
      ## How was this patch tested?
      Tested in #16048 by running many times.
      Author: Shixiong Zhu <>
      Closes #16109 from zsxwing/fix-kafka-flaky-test.
      (cherry picked from commit edc87e18)
      Signed-off-by: default avatarTathagata Das <>
  21. Dec 06, 2016
    • Tathagata Das's avatar
      [SPARK-18671][SS][TEST-MAVEN] Follow up PR to fix test for Maven · 3750c6e9
      Tathagata Das authored
      ## What changes were proposed in this pull request?
      Maven compilation seem to not allow resource is sql/test to be easily referred to in kafka-0-10-sql tests. So moved the kafka-source-offset-version-2.1.0 from sql test resources to kafka-0-10-sql test resources.
      ## How was this patch tested?
      Manually ran maven test
      Author: Tathagata Das <>
      Closes #16183 from tdas/SPARK-18671-1.
      (cherry picked from commit 5c6bcdbd)
      Signed-off-by: default avatarTathagata Das <>
    • Tathagata Das's avatar
      [SPARK-18671][SS][TEST] Added tests to ensure stability of that all Structured... · d20e0d6b
      Tathagata Das authored
      [SPARK-18671][SS][TEST] Added tests to ensure stability of that all Structured Streaming log formats
      ## What changes were proposed in this pull request?
      To be able to restart StreamingQueries across Spark version, we have already made the logs (offset log, file source log, file sink log) use json. We should added tests with actual json files in the Spark such that any incompatible changes in reading the logs is immediately caught. This PR add tests for FileStreamSourceLog, FileStreamSinkLog, and OffsetSeqLog.
      ## How was this patch tested?
      new unit tests
      Author: Tathagata Das <>
      Closes #16128 from tdas/SPARK-18671.
      (cherry picked from commit 1ef6b296)
      Signed-off-by: default avatarShixiong Zhu <>
  22. Nov 29, 2016
    • Tathagata Das's avatar
      [SPARK-18516][SQL] Split state and progress in streaming · 28b57c8a
      Tathagata Das authored
      This PR separates the status of a `StreamingQuery` into two separate APIs:
       - `status` - describes the status of a `StreamingQuery` at this moment, including what phase of processing is currently happening and if data is available.
       - `recentProgress` - an array of statistics about the most recent microbatches that have executed.
      A recent progress contains the following information:
        "id" : "2be8670a-fce1-4859-a530-748f29553bb6",
        "name" : "query-29",
        "timestamp" : 1479705392724,
        "inputRowsPerSecond" : 230.76923076923077,
        "processedRowsPerSecond" : 10.869565217391303,
        "durationMs" : {
          "triggerExecution" : 276,
          "queryPlanning" : 3,
          "getBatch" : 5,
          "getOffset" : 3,
          "addBatch" : 234,
          "walCommit" : 30
        "currentWatermark" : 0,
        "stateOperators" : [ ],
        "sources" : [ {
          "description" : "KafkaSource[Subscribe[topic-14]]",
          "startOffset" : {
            "topic-14" : {
              "2" : 0,
              "4" : 1,
              "1" : 0,
              "3" : 0,
              "0" : 0
          "endOffset" : {
            "topic-14" : {
              "2" : 1,
              "4" : 2,
              "1" : 0,
              "3" : 0,
              "0" : 1
          "numRecords" : 3,
          "inputRowsPerSecond" : 230.76923076923077,
          "processedRowsPerSecond" : 10.869565217391303
        } ]
      Additionally, in order to make it possible to correlate progress updates across restarts, we change the `id` field from an integer that is unique with in the JVM to a `UUID` that is globally unique.
      Author: Tathagata Das <>
      Author: Michael Armbrust <>
      Closes #15954 from marmbrus/queryProgress.
      (cherry picked from commit c3d08e2f)
      Signed-off-by: default avatarMichael Armbrust <>
    • hyukjinkwon's avatar
      [SPARK-3359][DOCS] Make javadoc8 working for unidoc/genjavadoc compatibility... · 84b2af22
      hyukjinkwon authored
      [SPARK-3359][DOCS] Make javadoc8 working for unidoc/genjavadoc compatibility in Java API documentation
      ## What changes were proposed in this pull request?
      This PR make `sbt unidoc` complete with Java 8.
      This PR roughly includes several fixes as below:
      - Fix unrecognisable class and method links in javadoc by changing it from `[[..]]` to `` `...` ``
        - * A column that will be computed based on the data in a [[DataFrame]].
        + * A column that will be computed based on the data in a `DataFrame`.
      - Fix throws annotations so that they are recognisable in javadoc
      - Fix URL links to `<a href="http..."></a>`.
        - * [[ Decision tree]] model for regression.
        + * <a href="">
        + * Decision tree (Wikipedia)</a> model for regression.
        -   * see
        +   * see <a href="">
        +   * Receiver operating characteristic (Wikipedia)</a>
      - Fix < to > to
        - `greater than`/`greater than or equal to` or `less than`/`less than or equal to` where applicable.
        - Wrap it with `{{{...}}}` to print them in javadoc or use `{code ...}` or `{literal ..}`. Please refer
      - Fix `</p>` complaint
      ## How was this patch tested?
      Manually tested by `jekyll build` with Java 7 and 8
      java version "1.7.0_80"
      Java(TM) SE Runtime Environment (build 1.7.0_80-b15)
      Java HotSpot(TM) 64-Bit Server VM (build 24.80-b11, mixed mode)
      java version "1.8.0_45"
      Java(TM) SE Runtime Environment (build 1.8.0_45-b14)
      Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode)
      Author: hyukjinkwon <>
      Closes #16013 from HyukjinKwon/SPARK-3359-errors-more.
      (cherry picked from commit f830bb91)
      Signed-off-by: default avatarSean Owen <>
  23. Nov 28, 2016