diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index b2199fdf90e5c530389828d9997b7b5f0ba98fd6..c1f8b2b3d9605bf57cb11a6e617b0393d5a127b2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -132,13 +132,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } - private def makeQualifiedPath(path: String): URI = { - // copy-paste from SessionCatalog - val hadoopPath = new Path(path) - val fs = hadoopPath.getFileSystem(sparkContext.hadoopConfiguration) - fs.makeQualified(hadoopPath).toUri - } - test("Create Database using Default Warehouse Path") { val catalog = spark.sessionState.catalog val dbName = "db1" @@ -2086,9 +2079,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { Seq(1).toDF("a").write.saveAsTable("t") val tblloc = new File(loc, "t") val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val tblPath = new Path(tblloc.getAbsolutePath) - val fs = tblPath.getFileSystem(spark.sessionState.newHadoopConf()) - assert(table.location == fs.makeQualified(tblPath).toUri) + assert(table.location == makeQualifiedPath(tblloc.getAbsolutePath)) assert(tblloc.listFiles().nonEmpty) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index d4afb9d8af6f85a1d15fb004c7092c071d2a2898..9201954b66d10cfcf5ff9cdb366e23ba86a4a3a4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -18,13 +18,14 @@ package org.apache.spark.sql.test import java.io.File +import java.net.URI import java.util.UUID import scala.language.implicitConversions import scala.util.Try import scala.util.control.NonFatal -import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkFunSuite @@ -294,6 +295,17 @@ private[sql] trait SQLTestUtils test(name) { runOnThread() } } } + + /** + * This method is used to make the given path qualified, when a path + * does not contain a scheme, this path will not be changed after the default + * FileSystem is changed. + */ + def makeQualifiedPath(path: String): URI = { + val hadoopPath = new Path(path) + val fs = hadoopPath.getFileSystem(spark.sessionState.newHadoopConf()) + fs.makeQualified(hadoopPath).toUri + } } private[sql] object SQLTestUtils { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index df2c1cee942b099b1019920ad6d0d07f91531300..10d929a4a0ef8f8ad8e35a7d9ee81fd4762164ad 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1654,10 +1654,8 @@ class HiveDDLSuite |LOCATION '$dir' |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d """.stripMargin) - val dirPath = new Path(dir.getAbsolutePath) - val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf()) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(new Path(table.location) == fs.makeQualified(dirPath)) + assert(table.location == makeQualifiedPath(dir.getAbsolutePath)) checkAnswer(spark.table("t"), Row(3, 4, 1, 2)) } @@ -1675,10 +1673,8 @@ class HiveDDLSuite |LOCATION '$dir' |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d """.stripMargin) - val dirPath = new Path(dir.getAbsolutePath) - val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf()) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) - assert(new Path(table.location) == fs.makeQualified(dirPath)) + assert(table.location == makeQualifiedPath(dir.getAbsolutePath)) val partDir = new File(dir, "a=3") assert(partDir.exists()) @@ -1792,9 +1788,7 @@ class HiveDDLSuite """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val path = new Path(loc.getAbsolutePath) - val fs = path.getFileSystem(spark.sessionState.newHadoopConf()) - assert(table.location == fs.makeQualified(path).toUri) + assert(table.location == makeQualifiedPath(loc.getAbsolutePath)) assert(new Path(table.location).toString.contains(specialChars)) assert(loc.listFiles().isEmpty) @@ -1822,9 +1816,7 @@ class HiveDDLSuite """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) - val path = new Path(loc.getAbsolutePath) - val fs = path.getFileSystem(spark.sessionState.newHadoopConf()) - assert(table.location == fs.makeQualified(path).toUri) + assert(table.location == makeQualifiedPath(loc.getAbsolutePath)) assert(new Path(table.location).toString.contains(specialChars)) assert(loc.listFiles().isEmpty) @@ -1871,7 +1863,7 @@ class HiveDDLSuite val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) val tblPath = new Path(tblloc.getAbsolutePath) val fs = tblPath.getFileSystem(spark.sessionState.newHadoopConf()) - assert(table.location == fs.makeQualified(tblPath).toUri) + assert(table.location == makeQualifiedPath(tblloc.getAbsolutePath)) assert(tblloc.listFiles().nonEmpty) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala index 4f771caa1db27cdce72fb6421fe0e6b1809c7adf..ba0a7605da71c4f595987c2e9377b040e268c7ab 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala @@ -19,10 +19,10 @@ package org.apache.spark.sql.hive.orc import java.io.File -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.Path -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.HadoopFsRelationTest import org.apache.spark.sql.types._ @@ -42,12 +42,9 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { test("save()/load() - partitioned table - simple queries - partition columns in data") { withTempDir { file => - val basePath = new Path(file.getCanonicalPath) - val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf) - val qualifiedBasePath = fs.makeQualified(basePath) - for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) { - val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2") + val partitionDir = new Path( + CatalogUtils.URIToString(makeQualifiedPath(file.getCanonicalPath)), s"p1=$p1/p2=$p2") sparkContext .parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1)) .toDF("a", "b", "p1") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala index d79edee5b1a4c166da1197821b8fa567b16494ae..49be30435ad2f2d1be2b3be04eef96b2374ee9bd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala @@ -21,8 +21,8 @@ import java.math.BigDecimal import org.apache.hadoop.fs.Path -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.types._ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest { @@ -38,12 +38,9 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest { test("save()/load() - partitioned table - simple queries - partition columns in data") { withTempDir { file => - val basePath = new Path(file.getCanonicalPath) - val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf) - val qualifiedBasePath = fs.makeQualified(basePath) - for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) { - val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2") + val partitionDir = new Path( + CatalogUtils.URIToString(makeQualifiedPath(file.getCanonicalPath)), s"p1=$p1/p2=$p2") sparkContext .parallelize(for (i <- 1 to 3) yield s"""{"a":$i,"b":"val_$i"}""") .saveAsTextFile(partitionDir.toString) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index 03207ab869d1259f0cb61c55b8f77e1449c3bacb..dce5bb7ddba66b97bcfcfc402369ef893cddc48b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -23,8 +23,8 @@ import com.google.common.io.Files import org.apache.hadoop.fs.Path import org.apache.parquet.hadoop.ParquetOutputFormat -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -44,12 +44,9 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { test("save()/load() - partitioned table - simple queries - partition columns in data") { withTempDir { file => - val basePath = new Path(file.getCanonicalPath) - val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf) - val qualifiedBasePath = fs.makeQualified(basePath) - for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) { - val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2") + val partitionDir = new Path( + CatalogUtils.URIToString(makeQualifiedPath(file.getCanonicalPath)), s"p1=$p1/p2=$p2") sparkContext .parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1)) .toDF("a", "b", "p1") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala index a47a2246ddc3c098eb0d2415022d18ba594895cf..2ec593b95c9b60e81b545eb9c8d55265cf53dc9f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.sources import org.apache.hadoop.fs.Path -import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.catalyst.expressions.PredicateHelper import org.apache.spark.sql.types._ @@ -45,12 +45,9 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat test("save()/load() - partitioned table - simple queries - partition columns in data") { withTempDir { file => - val basePath = new Path(file.getCanonicalPath) - val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf) - val qualifiedBasePath = fs.makeQualified(basePath) - for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) { - val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2") + val partitionDir = new Path( + CatalogUtils.URIToString(makeQualifiedPath(file.getCanonicalPath)), s"p1=$p1/p2=$p2") sparkContext .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1") .saveAsTextFile(partitionDir.toString)