-
- Downloads
[SPARK-19721][SS][BRANCH-2.1] Good error message for version mismatch in log files
## Problem There are several places where we write out version identifiers in various logs for structured streaming (usually `v1`). However, in the places where we check for this, we throw a confusing error message. ## What changes were proposed in this pull request? This patch made two major changes: 1. added a `parseVersion(...)` method, and based on this method, fixed the following places the way they did version checking (no other place needed to do this checking): ``` HDFSMetadataLog - CompactibleFileStreamLog ------------> fixed with this patch - FileStreamSourceLog ---------------> inherited the fix of `CompactibleFileStreamLog` - FileStreamSinkLog -----------------> inherited the fix of `CompactibleFileStreamLog` - OffsetSeqLog ------------------------> fixed with this patch - anonymous subclass in KafkaSource ---> fixed with this patch ``` 2. changed the type of `FileStreamSinkLog.VERSION`, `FileStreamSourceLog.VERSION` etc. from `String` to `Int`, so that we can identify newer versions via `version > 1` instead of `version != "v1"` - note this didn't break any backwards compatibility -- we are still writing out `"v1"` and reading back `"v1"` ## Exception message with this patch ``` java.lang.IllegalStateException: Failed to read log file /private/var/folders/nn/82rmvkk568sd8p3p8tb33trw0000gn/T/spark-86867b65-0069-4ef1-b0eb-d8bd258ff5b8/0. UnsupportedLogVersion: maximum supported log version is v1, but encountered v99. The log file was produced by a newer version of Spark and cannot be read by this version. Please upgrade. at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:202) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:78) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:75) at org.apache.spark.sql.test.SQLTestUtils$class.withTempDir(SQLTestUtils.scala:133) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite.withTempDir(OffsetSeqLogSuite.scala:26) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply$mcV$sp(OffsetSeqLogSuite.scala:75) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) ``` ## How was this patch tested? unit tests Author: Liwei Lin <lwlin7@gmail.com> Closes #17327 from lw-lin/good-msg-2.1.
Showing
- external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala 7 additions, 7 deletions...ain/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
- external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala 7 additions, 2 deletions...cala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala 3 additions, 6 deletions...rk/sql/execution/streaming/CompactibleFileStreamLog.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala 2 additions, 2 deletions...che/spark/sql/execution/streaming/FileStreamSinkLog.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala 2 additions, 2 deletions...e/spark/sql/execution/streaming/FileStreamSourceLog.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala 36 additions, 0 deletions...pache/spark/sql/execution/streaming/HDFSMetadataLog.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala 4 additions, 6 deletions...g/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
- sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala 34 additions, 6 deletions...l/execution/streaming/CompactibleFileStreamLogSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala 4 additions, 4 deletions...park/sql/execution/streaming/FileStreamSinkLogSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala 27 additions, 0 deletions.../spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala 17 additions, 0 deletions...che/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
Loading
Please register or sign in to comment