diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index becf3829e7248f198407249f224ee17c4f51766e..5d522189a0c292ccf97b643c588cf4b2cc2cfac1 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -259,7 +259,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext test("output metrics on records written") { val file = new File(tmpDir, getClass.getSimpleName) - val filePath = "file://" + file.getAbsolutePath + val filePath = file.toURI.toURL.toString val records = runAndReturnRecordsWritten { sc.parallelize(1 to numRecords).saveAsTextFile(filePath) @@ -269,7 +269,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext test("output metrics on records written - new Hadoop API") { val file = new File(tmpDir, getClass.getSimpleName) - val filePath = "file://" + file.getAbsolutePath + val filePath = file.toURI.toURL.toString val records = runAndReturnRecordsWritten { sc.parallelize(1 to numRecords).map(key => (key.toString, key.toString)) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala index 673d60ff6f87a2838242440ac823a99df5aca997..68bc3e3e2e9a8fe4229abde98884d00795f59280 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala @@ -24,6 +24,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.QueryTest import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.util.Utils class KafkaRelationSuite extends QueryTest with BeforeAndAfter with SharedSQLContext { @@ -147,6 +148,9 @@ class KafkaRelationSuite extends QueryTest with BeforeAndAfter with SharedSQLCon } test("test late binding start offsets") { + // Kafka fails to remove the logs on Windows. See KAFKA-1194. + assume(!Utils.isWindows) + var kafkaUtils: KafkaTestUtils = null try { /** diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 4f82b133cb4c8588e974d0a4cbf9c404866a7095..534fb77c9ce18f5bfd25c975dae9f5e447caacce 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest} import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} +import org.apache.spark.util.Utils abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { @@ -161,11 +162,12 @@ class KafkaSourceSuite extends KafkaSourceTest { // Make sure Spark 2.1.0 will throw an exception when reading the new log intercept[java.lang.IllegalArgumentException] { // Simulate how Spark 2.1.0 reads the log - val in = new FileInputStream(metadataPath.getAbsolutePath + "/0") - val length = in.read() - val bytes = new Array[Byte](length) - in.read(bytes) - KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8))) + Utils.tryWithResource(new FileInputStream(metadataPath.getAbsolutePath + "/0")) { in => + val length = in.read() + val bytes = new Array[Byte](length) + in.read(bytes) + KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8))) + } } } } @@ -181,13 +183,13 @@ class KafkaSourceSuite extends KafkaSourceTest { "subscribe" -> topic ) - val from = Paths.get( - getClass.getResource("/kafka-source-initial-offset-version-2.1.0.bin").getPath) + val from = new File( + getClass.getResource("/kafka-source-initial-offset-version-2.1.0.bin").toURI).toPath val to = Paths.get(s"${metadataPath.getAbsolutePath}/0") Files.copy(from, to) - val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None, - "", parameters) + val source = provider.createSource( + spark.sqlContext, metadataPath.toURI.toString, None, "", parameters) val deserializedOffset = source.getOffset.get val referenceOffset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 0L), (topic, 2, 0L)) assert(referenceOffset == deserializedOffset) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index e1a3b247fd4fc543a6e1c790940068a1b065e888..b44f20e367f0ac9f1a21fa2aeb027721a57d5e89 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1520,7 +1520,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val e = intercept[AnalysisException] { sql("CREATE TABLE tab1 USING json") }.getMessage assert(e.contains("Unable to infer schema for JSON. It must be specified manually")) - sql(s"CREATE TABLE tab2 using json location '${tempDir.getCanonicalPath}'") + sql(s"CREATE TABLE tab2 using json location '${tempDir.toURI}'") checkAnswer(spark.table("tab2"), Row("a", "b")) } } @@ -1814,7 +1814,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val defaultTablePath = spark.sessionState.catalog .getTableMetadata(TableIdentifier("tbl")).storage.locationUri.get - sql(s"ALTER TABLE tbl SET LOCATION '${dir.getCanonicalPath}'") + sql(s"ALTER TABLE tbl SET LOCATION '${dir.toURI}'") spark.catalog.refreshTable("tbl") // SET LOCATION won't move data from previous table path to new table path. assert(spark.table("tbl").count() == 0) @@ -1836,15 +1836,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { test("insert data to a data source table which has a not existed location should succeed") { withTable("t") { withTempDir { dir => + val path = dir.toURI.toString.stripSuffix("/") spark.sql( s""" |CREATE TABLE t(a string, b int) |USING parquet - |OPTIONS(path "$dir") + |OPTIONS(path "$path") """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val expectedPath = dir.getAbsolutePath.stripSuffix("/") - assert(table.location.stripSuffix("/") == expectedPath) + assert(table.location == path) dir.delete val tableLocFile = new File(table.location.stripPrefix("file:")) @@ -1859,8 +1859,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(tableLocFile.exists) checkAnswer(spark.table("t"), Row("c", 1) :: Nil) - val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" - val newDirFile = new File(newDir) + val newDirFile = new File(dir, "x") + val newDir = newDirFile.toURI.toString spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'") spark.sessionState.catalog.refreshTable(TableIdentifier("t")) @@ -1878,16 +1878,16 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { test("insert into a data source table with no existed partition location should succeed") { withTable("t") { withTempDir { dir => + val path = dir.toURI.toString.stripSuffix("/") spark.sql( s""" |CREATE TABLE t(a int, b int, c int, d int) |USING parquet |PARTITIONED BY(a, b) - |LOCATION "$dir" + |LOCATION "$path" """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val expectedPath = dir.getAbsolutePath.stripSuffix("/") - assert(table.location.stripSuffix("/") == expectedPath) + assert(table.location == path) spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) @@ -1906,25 +1906,26 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { test("read data from a data source table which has a not existed location should succeed") { withTable("t") { withTempDir { dir => + val path = dir.toURI.toString.stripSuffix("/") spark.sql( s""" |CREATE TABLE t(a string, b int) |USING parquet - |OPTIONS(path "$dir") + |OPTIONS(path "$path") """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val expectedPath = dir.getAbsolutePath.stripSuffix("/") - assert(table.location.stripSuffix("/") == expectedPath) + assert(table.location == path) dir.delete() checkAnswer(spark.table("t"), Nil) - val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" + val newDirFile = new File(dir, "x") + val newDir = newDirFile.toURI.toString spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'") val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) assert(table1.location == newDir) - assert(!new File(newDir).exists()) + assert(!newDirFile.exists()) checkAnswer(spark.table("t"), Nil) } } @@ -1938,7 +1939,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { |CREATE TABLE t(a int, b int, c int, d int) |USING parquet |PARTITIONED BY(a, b) - |LOCATION "$dir" + |LOCATION "${dir.toURI}" """.stripMargin) spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index c04b9ee0f2cd5ef7d4ab463b7767fd0401e6f5a0..792ac1e259494b1ad6b9f3a16ee6274181cb11f0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1570,7 +1570,7 @@ class HiveDDLSuite val dataPath = new File(new File(path, "d=1"), "b=1").getCanonicalPath Seq(1 -> 1).toDF("a", "c").write.save(dataPath) - sql(s"CREATE TABLE t3 USING parquet LOCATION '${path.getCanonicalPath}'") + sql(s"CREATE TABLE t3 USING parquet LOCATION '${path.toURI}'") assert(getTableColumns("t3") == Seq("a", "c", "d", "b")) }