diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index dc3a28e27cd4e65000755ef095c0af975f2b213c..e626ed3621d6048d4ff0c927dcfbe0454f34b5b6 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -51,10 +51,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC assert(valuesFor2.toList.sorted === List(1)) } - // Some tests using `local-cluster` here are failed on Windows due to the failure of initiating - // executors by the path length limitation. See SPARK-18718. test("shuffle non-zero block size") { - assume(!Utils.isWindows) sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) val NUM_BLOCKS = 3 @@ -80,7 +77,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC } test("shuffle serializer") { - assume(!Utils.isWindows) // Use a local cluster with 2 processes to make sure there are both local and remote blocks sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) val a = sc.parallelize(1 to 10, 2) @@ -97,7 +93,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC } test("zero sized blocks") { - assume(!Utils.isWindows) // Use a local cluster with 2 processes to make sure there are both local and remote blocks sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) @@ -125,7 +120,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC } test("zero sized blocks without kryo") { - assume(!Utils.isWindows) // Use a local cluster with 2 processes to make sure there are both local and remote blocks sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) @@ -151,7 +145,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC } test("shuffle on mutable pairs") { - assume(!Utils.isWindows) // Use a local cluster with 2 processes to make sure there are both local and remote blocks sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2) @@ -164,7 +157,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC } test("sorting on mutable pairs") { - assume(!Utils.isWindows) // This is not in SortingSuite because of the local cluster setup. // Use a local cluster with 2 processes to make sure there are both local and remote blocks sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) @@ -180,7 +172,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC } test("cogroup using mutable pairs") { - assume(!Utils.isWindows) // Use a local cluster with 2 processes to make sure there are both local and remote blocks sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2) @@ -208,7 +199,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC } test("subtract mutable pairs") { - assume(!Utils.isWindows) // Use a local cluster with 2 processes to make sure there are both local and remote blocks sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2) @@ -223,7 +213,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC } test("sort with Java non serializable class - Kryo") { - assume(!Utils.isWindows) // Use a local cluster with 2 processes to make sure there are both local and remote blocks val myConf = conf.clone().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sc = new SparkContext("local-cluster[2,1,1024]", "test", myConf) @@ -238,7 +227,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC } test("sort with Java non serializable class - Java") { - assume(!Utils.isWindows) // Use a local cluster with 2 processes to make sure there are both local and remote blocks sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) val a = sc.parallelize(1 to 10, 2) diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java index ba43659d96f55994503080f27902e445aa862c20..0622fef17c8d6a6d4bb21ca6367b11cdf886b0dc 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java @@ -26,9 +26,11 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.regex.Pattern; import static org.apache.spark.launcher.CommandBuilderUtils.*; @@ -135,7 +137,7 @@ abstract class AbstractCommandBuilder { List<String> buildClassPath(String appClassPath) throws IOException { String sparkHome = getSparkHome(); - List<String> cp = new ArrayList<>(); + Set<String> cp = new LinkedHashSet<>(); addToClassPath(cp, getenv("SPARK_CLASSPATH")); addToClassPath(cp, appClassPath); @@ -201,7 +203,7 @@ abstract class AbstractCommandBuilder { addToClassPath(cp, getenv("HADOOP_CONF_DIR")); addToClassPath(cp, getenv("YARN_CONF_DIR")); addToClassPath(cp, getenv("SPARK_DIST_CLASSPATH")); - return cp; + return new ArrayList<>(cp); } /** @@ -210,7 +212,7 @@ abstract class AbstractCommandBuilder { * @param cp List to which the new entries are appended. * @param entries New classpath entries (separated by File.pathSeparator). */ - private void addToClassPath(List<String> cp, String entries) { + private void addToClassPath(Set<String> cp, String entries) { if (isEmpty(entries)) { return; } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index fdc33c77fe292d1bba65819f746592010de9c738..74edd537f528fc29a7afe7d119dc84a89e097d5c 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -824,7 +824,8 @@ object TestSettings { // launched by the tests have access to the correct test-time classpath. envVars in Test ++= Map( "SPARK_DIST_CLASSPATH" -> - (fullClasspath in Test).value.files.map(_.getAbsolutePath).mkString(":").stripSuffix(":"), + (fullClasspath in Test).value.files.map(_.getAbsolutePath) + .mkString(File.pathSeparator).stripSuffix(File.pathSeparator), "SPARK_PREPEND_CLASSES" -> "1", "SPARK_SCALA_VERSION" -> scalaBinaryVersion, "SPARK_TESTING" -> "1", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala index 07839359a019c0f720c677b76d5ff85362b4ce3b..119d6e25df3b65eea89264409a4a6a7b468493f2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala @@ -86,39 +86,31 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { plan } - // The tests here are failed on Windows due to the failure of initiating executors - // by the path length limitation. See SPARK-18718. test("unsafe broadcast hash join updates peak execution memory") { - assume(!Utils.isWindows) testBroadcastJoinPeak[BroadcastHashJoinExec]("unsafe broadcast hash join", "inner") } test("unsafe broadcast hash outer join updates peak execution memory") { - assume(!Utils.isWindows) testBroadcastJoinPeak[BroadcastHashJoinExec]("unsafe broadcast hash outer join", "left_outer") } test("unsafe broadcast left semi join updates peak execution memory") { - assume(!Utils.isWindows) testBroadcastJoinPeak[BroadcastHashJoinExec]("unsafe broadcast left semi join", "leftsemi") } test("broadcast hint isn't bothered by authBroadcastJoinThreshold set to low values") { - assume(!Utils.isWindows) withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") { testBroadcastJoin[BroadcastHashJoinExec]("inner", true) } } test("broadcast hint isn't bothered by a disabled authBroadcastJoinThreshold") { - assume(!Utils.isWindows) withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { testBroadcastJoin[BroadcastHashJoinExec]("inner", true) } } test("broadcast hint isn't propagated after a join") { - assume(!Utils.isWindows) withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value") val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value") @@ -146,7 +138,6 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { } test("broadcast hint is propagated correctly") { - assume(!Utils.isWindows) withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"), (3, "2"))).toDF("key", "value") val broadcasted = broadcast(df2) @@ -167,7 +158,6 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { } test("join key rewritten") { - assume(!Utils.isWindows) val l = Literal(1L) val i = Literal(2) val s = Literal.create(3, ShortType)