diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 234e6bb8443aff42d64cd93dc5036c9cc4c20b86..c38b6e8c61d8a5ee1b4f20f1064e739a5c2ab187 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.parquet
 import java.io.IOException
 import java.lang.{Double => JDouble, Float => JFloat, Long => JLong}
 import java.math.{BigDecimal => JBigDecimal}
+import java.net.URI
 import java.text.SimpleDateFormat
 import java.util.{Date, List => JList}
 
@@ -244,11 +245,10 @@ private[sql] case class ParquetRelation2(
      * Refreshes `FileStatus`es, footers, partition spec, and table schema.
      */
     def refresh(): Unit = {
-      val fs = FileSystem.get(sparkContext.hadoopConfiguration)
-
       // Support either reading a collection of raw Parquet part-files, or a collection of folders
       // containing Parquet files (e.g. partitioned Parquet table).
       val baseStatuses = paths.distinct.map { p =>
+        val fs = FileSystem.get(URI.create(p), sparkContext.hadoopConfiguration)
         val qualified = fs.makeQualified(new Path(p))
 
         if (!fs.exists(qualified) && maybeSchema.isDefined) {
@@ -262,6 +262,7 @@ private[sql] case class ParquetRelation2(
 
       // Lists `FileStatus`es of all leaf nodes (files) under all base directories.
       val leaves = baseStatuses.flatMap { f =>
+        val fs = FileSystem.get(f.getPath.toUri, sparkContext.hadoopConfiguration)
         SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f =>
           isSummaryFile(f.getPath) ||
             !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))