diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala index 5e0bf6d438dc8488b70f6c711950f0dbbf755353..32dd3ecc2f02743fbd66132f1df3bc560a2c9e69 100644 --- a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala @@ -137,9 +137,10 @@ class RPackageUtilsSuite IvyTestUtils.withRepository(main, None, None) { repo => val jar = IvyTestUtils.packJar(new File(new URI(repo)), dep1, Nil, useIvyLayout = false, withR = false, None) - val jarFile = new JarFile(jar) - assert(jarFile.getManifest == null, "jar file should have null manifest") - assert(!RPackageUtils.checkManifestForR(jarFile), "null manifest should return false") + Utils.tryWithResource(new JarFile(jar)) { jarFile => + assert(jarFile.getManifest == null, "jar file should have null manifest") + assert(!RPackageUtils.checkManifestForR(jarFile), "null manifest should return false") + } } } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 724096d4abf343d9e177ffb577204268c8e677bb..7400ceb802d14a6a3ddb6513943b1afa1635b91b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -831,7 +831,7 @@ class SparkSubmitSuite val hadoopConf = new Configuration() val tmpDir = Files.createTempDirectory("tmp").toFile updateConfWithFakeS3Fs(hadoopConf) - val sourcePath = s"s3a://${jarFile.getAbsolutePath}" + val sourcePath = s"s3a://${jarFile.toURI.getPath}" val outputPath = DependencyUtils.downloadFile(sourcePath, tmpDir, sparkConf, hadoopConf, new SecurityManager(sparkConf)) checkDownloadedFile(sourcePath, outputPath) @@ -847,7 +847,7 @@ class SparkSubmitSuite val hadoopConf = new Configuration() val tmpDir = Files.createTempDirectory("tmp").toFile updateConfWithFakeS3Fs(hadoopConf) - val sourcePaths = Seq("/local/file", s"s3a://${jarFile.getAbsolutePath}") + val sourcePaths = Seq("/local/file", s"s3a://${jarFile.toURI.getPath}") val outputPaths = DependencyUtils .downloadFileList(sourcePaths.mkString(","), tmpDir, sparkConf, hadoopConf, new SecurityManager(sparkConf)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index 88a68af6b647d6028b720290df9632a76f39d673..d17e3864854a8a9b64261369fadec02ce091f8e4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -21,6 +21,7 @@ import java.io._ import java.net.URI import java.util.concurrent.atomic.AtomicInteger +import org.apache.hadoop.fs.Path import org.json4s.jackson.JsonMethods._ import org.scalatest.BeforeAndAfter @@ -84,24 +85,23 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp val buffered = new ByteArrayOutputStream val codec = new LZ4CompressionCodec(new SparkConf()) val compstream = codec.compressedOutputStream(buffered) - val writer = new PrintWriter(compstream) + Utils.tryWithResource(new PrintWriter(compstream)) { writer => - val applicationStart = SparkListenerApplicationStart("AppStarts", None, - 125L, "Mickey", None) - val applicationEnd = SparkListenerApplicationEnd(1000L) + val applicationStart = SparkListenerApplicationStart("AppStarts", None, + 125L, "Mickey", None) + val applicationEnd = SparkListenerApplicationEnd(1000L) - // scalastyle:off println - writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart)))) - writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd)))) - // scalastyle:on println - writer.close() + // scalastyle:off println + writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart)))) + writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd)))) + // scalastyle:on println + } val logFilePath = Utils.getFilePath(testDir, "events.lz4.inprogress") - val fstream = fileSystem.create(logFilePath) val bytes = buffered.toByteArray - - fstream.write(bytes, 0, buffered.size) - fstream.close + Utils.tryWithResource(fileSystem.create(logFilePath)) { fstream => + fstream.write(bytes, 0, buffered.size) + } // Read the compressed .inprogress file and verify only first event was parsed. val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath) @@ -112,17 +112,19 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp // Verify the replay returns the events given the input maybe truncated. val logData = EventLoggingListener.openEventLog(logFilePath, fileSystem) - val failingStream = new EarlyEOFInputStream(logData, buffered.size - 10) - replayer.replay(failingStream, logFilePath.toString, true) + Utils.tryWithResource(new EarlyEOFInputStream(logData, buffered.size - 10)) { failingStream => + replayer.replay(failingStream, logFilePath.toString, true) - assert(eventMonster.loggedEvents.size === 1) - assert(failingStream.didFail) + assert(eventMonster.loggedEvents.size === 1) + assert(failingStream.didFail) + } // Verify the replay throws the EOF exception since the input may not be truncated. val logData2 = EventLoggingListener.openEventLog(logFilePath, fileSystem) - val failingStream2 = new EarlyEOFInputStream(logData2, buffered.size - 10) - intercept[EOFException] { - replayer.replay(failingStream2, logFilePath.toString, false) + Utils.tryWithResource(new EarlyEOFInputStream(logData2, buffered.size - 10)) { failingStream2 => + intercept[EOFException] { + replayer.replay(failingStream2, logFilePath.toString, false) + } } } @@ -151,7 +153,10 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp * assumption that the event logging behavior is correct (tested in a separate suite). */ private def testApplicationReplay(codecName: Option[String] = None) { - val logDirPath = Utils.getFilePath(testDir, "test-replay") + val logDir = new File(testDir.getAbsolutePath, "test-replay") + // Here, it creates `Path` from the URI instead of the absolute path for the explicit file + // scheme so that the string representation of this `Path` has leading file scheme correctly. + val logDirPath = new Path(logDir.toURI) fileSystem.mkdirs(logDirPath) val conf = EventLoggingListenerSuite.getLoggingConf(logDirPath, codecName) @@ -221,12 +226,14 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp def didFail: Boolean = countDown.get == 0 @throws[IOException] - def read: Int = { + override def read(): Int = { if (countDown.get == 0) { throw new EOFException("Stream ended prematurely") } countDown.decrementAndGet() - in.read + in.read() } + + override def close(): Unit = in.close() } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index dc6140756d519df21d130da6dc11f9dfff35e22a..03e50e4119f6fbe7413986d8d01c8cd3c4c145ea 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -203,7 +203,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds') SELECT * FROM src") } - sql(s"ALTER TABLE $tableName SET LOCATION '$path'") + sql(s"ALTER TABLE $tableName SET LOCATION '${path.toURI}'") sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan") @@ -222,7 +222,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto s""" |CREATE TABLE $sourceTableName (key STRING, value STRING) |PARTITIONED BY (ds STRING) - |LOCATION '$path' + |LOCATION '${path.toURI}' """.stripMargin) val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03") @@ -239,7 +239,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto s""" |CREATE TABLE $tableName (key STRING, value STRING) |PARTITIONED BY (ds STRING) - |LOCATION '$path' + |LOCATION '${path.toURI}' """.stripMargin) // Register only one of the partitions found on disk