Skip to content
Snippets Groups Projects
Commit 17db4bfe authored by Liwei Lin's avatar Liwei Lin Committed by Sean Owen
Browse files

[SPARK-14687][CORE][SQL][MLLIB] Call path.getFileSystem(conf) instead of call FileSystem.get(conf)

## What changes were proposed in this pull request?

- replaced `FileSystem.get(conf)` calls with `path.getFileSystem(conf)`

## How was this patch tested?

N/A

Author: Liwei Lin <lwlin7@gmail.com>

Closes #12450 from lw-lin/fix-fs-get.
parent a3451119
No related branches found
No related tags found
No related merge requests found
......@@ -353,7 +353,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
* the name of the file being compressed.
*/
def zipFileToStream(file: Path, entryName: String, outputStream: ZipOutputStream): Unit = {
val fs = FileSystem.get(hadoopConf)
val fs = file.getFileSystem(hadoopConf)
val inputStream = fs.open(file, 1 * 1024 * 1024) // 1MB Buffer
try {
outputStream.putNextEntry(new ZipEntry(entryName))
......@@ -372,7 +372,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get
}.foreach { attempt =>
val logPath = new Path(logDir, attempt.logPath)
zipFileToStream(new Path(logDir, attempt.logPath), attempt.logPath, zipStream)
zipFileToStream(logPath, attempt.logPath, zipStream)
}
} finally {
zipStream.close()
......
......@@ -640,7 +640,8 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
val deletePreviousCheckpointFile: () => Unit = () =>
previousCheckpointFile.foreach { file =>
try {
FileSystem.get(sc.hadoopConfiguration).delete(new Path(file), true)
val checkpointFile = new Path(file)
checkpointFile.getFileSystem(sc.hadoopConfiguration).delete(checkpointFile, true)
} catch {
case e: IOException =>
logWarning(s"Cannot delete checkpoint file $file:", e)
......
......@@ -273,8 +273,9 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
// There should be 1 checkpoint remaining.
assert(model.getCheckpointFiles.length === 1)
val fs = FileSystem.get(sqlContext.sparkContext.hadoopConfiguration)
assert(fs.exists(new Path(model.getCheckpointFiles.head)))
val checkpointFile = new Path(model.getCheckpointFiles.head)
val fs = checkpointFile.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
assert(fs.exists(checkpointFile))
model.deleteCheckpointFiles()
assert(model.getCheckpointFiles.isEmpty)
}
......
......@@ -178,7 +178,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
config.set("spark.sql.parquet.writeLegacyFormat", "false");
this.file = new Path(path);
long length = FileSystem.get(config).getFileStatus(this.file).getLen();
long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
ParquetMetadata footer = readFooter(config, file, range(0, length));
List<BlockMetaData> blocks = footer.getBlocks();
......
......@@ -39,7 +39,7 @@ class FileStreamSource(
providerName: String,
dataFrameBuilder: Array[String] => DataFrame) extends Source with Logging {
private val fs = FileSystem.get(sqlContext.sparkContext.hadoopConfiguration)
private val fs = new Path(path).getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
private val metadataLog = new HDFSMetadataLog[Seq[String]](sqlContext, metadataPath)
private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment