Skip to content
Snippets Groups Projects
Commit 62fab5be authored by uncleGen's avatar uncleGen Committed by Shixiong Zhu
Browse files

[SPARK-19407][SS] defaultFS is used FileSystem.get instead of getting it from uri scheme

## What changes were proposed in this pull request?

```
Caused by: java.lang.IllegalArgumentException: Wrong FS: s3a://**************/checkpoint/7b2231a3-d845-4740-bfa3-681850e5987f/metadata, expected: file:///
	at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:649)
	at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:82)
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:606)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
	at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
	at org.apache.spark.sql.execution.streaming.StreamMetadata$.read(StreamMetadata.scala:51)
	at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:100)
	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
```

Can easily replicate on spark standalone cluster by providing checkpoint location uri scheme anything other than "file://" and not overriding in config.

WorkAround  --conf spark.hadoop.fs.defaultFS=s3a://somebucket

 or set it in sparkConf or spark-default.conf

## How was this patch tested?

existing ut

Author: uncleGen <hustyugm@gmail.com>

Closes #16815 from uncleGen/SPARK-19407.

(cherry picked from commit 7a0a630e)
Signed-off-by: default avatarShixiong Zhu <shixiong@databricks.com>
parent f55bd4c7
No related branches found
No related tags found
No related merge requests found
......@@ -47,7 +47,7 @@ object StreamMetadata extends Logging {
/** Read the metadata from file if it exists */
def read(metadataFile: Path, hadoopConf: Configuration): Option[StreamMetadata] = {
val fs = FileSystem.get(hadoopConf)
val fs = metadataFile.getFileSystem(hadoopConf)
if (fs.exists(metadataFile)) {
var input: FSDataInputStream = null
try {
......@@ -72,7 +72,7 @@ object StreamMetadata extends Logging {
hadoopConf: Configuration): Unit = {
var output: FSDataOutputStream = null
try {
val fs = FileSystem.get(hadoopConf)
val fs = metadataFile.getFileSystem(hadoopConf)
output = fs.create(metadataFile)
val writer = new OutputStreamWriter(output)
Serialization.write(metadata, writer)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment