diff --git a/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java b/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java index 2090efd3b9990ae59dd3e102fdce90f82b173ed0..d4c42b38ac2248a1ff861fd579bad66d56b94d5a 100644 --- a/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java +++ b/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java @@ -23,11 +23,13 @@ import java.util.List; // See // http://scala-programming-language.1934581.n4.nabble.com/Workaround-for-implementing-java-varargs-in-2-7-2-final-tp1944767p1944772.html abstract class JavaSparkContextVarargsWorkaround { - public <T> JavaRDD<T> union(JavaRDD<T>... rdds) { + + @SafeVarargs + public final <T> JavaRDD<T> union(JavaRDD<T>... rdds) { if (rdds.length == 0) { throw new IllegalArgumentException("Union called on empty list"); } - ArrayList<JavaRDD<T>> rest = new ArrayList<JavaRDD<T>>(rdds.length - 1); + List<JavaRDD<T>> rest = new ArrayList<>(rdds.length - 1); for (int i = 1; i < rdds.length; i++) { rest.add(rdds[i]); } @@ -38,18 +40,19 @@ abstract class JavaSparkContextVarargsWorkaround { if (rdds.length == 0) { throw new IllegalArgumentException("Union called on empty list"); } - ArrayList<JavaDoubleRDD> rest = new ArrayList<JavaDoubleRDD>(rdds.length - 1); + List<JavaDoubleRDD> rest = new ArrayList<>(rdds.length - 1); for (int i = 1; i < rdds.length; i++) { rest.add(rdds[i]); } return union(rdds[0], rest); } - public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V>... rdds) { + @SafeVarargs + public final <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V>... rdds) { if (rdds.length == 0) { throw new IllegalArgumentException("Union called on empty list"); } - ArrayList<JavaPairRDD<K, V>> rest = new ArrayList<JavaPairRDD<K, V>>(rdds.length - 1); + List<JavaPairRDD<K, V>> rest = new ArrayList<>(rdds.length - 1); for (int i = 1; i < rdds.length; i++) { rest.add(rdds[i]); } @@ -57,7 +60,7 @@ abstract class JavaSparkContextVarargsWorkaround { } // These methods take separate "first" and "rest" elements to avoid having the same type erasure - abstract public <T> JavaRDD<T> union(JavaRDD<T> first, List<JavaRDD<T>> rest); - abstract public JavaDoubleRDD union(JavaDoubleRDD first, List<JavaDoubleRDD> rest); - abstract public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V> first, List<JavaPairRDD<K, V>> rest); + public abstract <T> JavaRDD<T> union(JavaRDD<T> first, List<JavaRDD<T>> rest); + public abstract JavaDoubleRDD union(JavaDoubleRDD first, List<JavaDoubleRDD> rest); + public abstract <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V> first, List<JavaPairRDD<K, V>> rest); } diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala index b53c86e89a2737aea08c31673615d2e59186ec88..ebad5bc5ab28d174e9f0896c2105733d099e04f7 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala @@ -27,9 +27,10 @@ import scala.util.control.NonFatal import com.google.common.io.ByteStreams import tachyon.client.{ReadType, WriteType, TachyonFS, TachyonFile} +import tachyon.conf.TachyonConf import tachyon.TachyonURI -import org.apache.spark.{SparkException, SparkConf, Logging} +import org.apache.spark.Logging import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.util.Utils @@ -60,7 +61,11 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log rootDirs = s"$storeDir/$appFolderName/$executorId" master = blockManager.conf.get(ExternalBlockStore.MASTER_URL, "tachyon://localhost:19998") - client = if (master != null && master != "") TachyonFS.get(new TachyonURI(master)) else null + client = if (master != null && master != "") { + TachyonFS.get(new TachyonURI(master), new TachyonConf()) + } else { + null + } // original implementation call System.exit, we change it to run without extblkstore support if (client == null) { logError("Failed to connect to the Tachyon as the master address is not configured") diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala index 11e87bd1dd8eb06b87cf59719e4952ae7d7bc397..34775577de8a34eb546b9d1ee391d44b56681c9d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala @@ -73,11 +73,11 @@ class PersistenceEngineSuite extends SparkFunSuite { assert(persistenceEngine.read[String]("test_").isEmpty) // Test deserializing objects that contain RpcEndpointRef - val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf)) + val testRpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf)) try { // Create a real endpoint so that we can test RpcEndpointRef deserialization - val workerEndpoint = rpcEnv.setupEndpoint("worker", new RpcEndpoint { - override val rpcEnv: RpcEnv = rpcEnv + val workerEndpoint = testRpcEnv.setupEndpoint("worker", new RpcEndpoint { + override val rpcEnv: RpcEnv = testRpcEnv }) val workerToPersist = new WorkerInfo( @@ -93,7 +93,8 @@ class PersistenceEngineSuite extends SparkFunSuite { persistenceEngine.addWorker(workerToPersist) - val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv) + val (storedApps, storedDrivers, storedWorkers) = + persistenceEngine.readPersistedData(testRpcEnv) assert(storedApps.isEmpty) assert(storedDrivers.isEmpty) @@ -110,8 +111,8 @@ class PersistenceEngineSuite extends SparkFunSuite { assert(workerToPersist.webUiPort === recoveryWorkerInfo.webUiPort) assert(workerToPersist.publicAddress === recoveryWorkerInfo.publicAddress) } finally { - rpcEnv.shutdown() - rpcEnv.awaitTermination() + testRpcEnv.shutdown() + testRpcEnv.awaitTermination() } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala index b354914b6ffd0139f7c136949952f293cfcc2820..2eb43b73133813e2c7d6158a0907e42c1d898537 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala @@ -17,10 +17,13 @@ package org.apache.spark.scheduler.cluster.mesos +import scala.language.reflectiveCalls + import org.apache.mesos.Protos.Value import org.mockito.Mockito._ import org.scalatest._ import org.scalatest.mock.MockitoSugar + import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoSugar { diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java index 75063dbf800d895a590af55418c4c64c6a6488d3..e7f2f6f615070be9a733540e7267f7e3562df71a 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java @@ -178,6 +178,7 @@ public class JavaOneVsRestExample { return params; } + @SuppressWarnings("static") private static Options generateCommandlineOptions() { Option input = OptionBuilder.withArgName("input") .hasArg() diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java index dbf2ef02d7b762aed48de5e1edf2938f9c55be3c..02f58f48b07ab3e86715abf0ed3c6e0e7c8244bb 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java @@ -85,7 +85,7 @@ public class JavaStatefulNetworkWordCount { @SuppressWarnings("unchecked") List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<String, Integer>("hello", 1), new Tuple2<String, Integer>("world", 1)); - JavaPairRDD<String, Integer> initialRDD = ssc.sc().parallelizePairs(tuples); + JavaPairRDD<String, Integer> initialRDD = ssc.sparkContext().parallelizePairs(tuples); JavaReceiverInputDStream<String> lines = ssc.socketTextStream( args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER_2); @@ -107,7 +107,7 @@ public class JavaStatefulNetworkWordCount { // This will give a Dstream made of state (which is the cumulative count of the words) JavaPairDStream<String, Integer> stateDstream = wordsDstream.updateStateByKey(updateFunction, - new HashPartitioner(ssc.sc().defaultParallelism()), initialRDD); + new HashPartitioner(ssc.sparkContext().defaultParallelism()), initialRDD); stateDstream.print(); ssc.start(); diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java index 02cd24a35906f9d179366c73fbedf261c34b4d44..9db07d0507fea4223474d50e4ba7672d1cb7af2c 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java @@ -70,7 +70,7 @@ public class JavaDirectKafkaStreamSuite implements Serializable { final String topic1 = "topic1"; final String topic2 = "topic2"; // hold a reference to the current offset ranges, so it can be used downstream - final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference(); + final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>(); String[] topic1data = createTopicAndSendData(topic1); String[] topic2data = createTopicAndSendData(topic2); diff --git a/mllib/src/test/java/org/apache/spark/mllib/evaluation/JavaRankingMetricsSuite.java b/mllib/src/test/java/org/apache/spark/mllib/evaluation/JavaRankingMetricsSuite.java index effc8a1a6dabc2754119a0202094db1eb7337c30..fa4d334801ce45d93fd77629e47ce777b6919234 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/evaluation/JavaRankingMetricsSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/evaluation/JavaRankingMetricsSuite.java @@ -18,12 +18,12 @@ package org.apache.spark.mllib.evaluation; import java.io.Serializable; -import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import scala.Tuple2; import scala.Tuple2$; -import com.google.common.collect.Lists; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -34,18 +34,18 @@ import org.apache.spark.api.java.JavaSparkContext; public class JavaRankingMetricsSuite implements Serializable { private transient JavaSparkContext sc; - private transient JavaRDD<Tuple2<ArrayList<Integer>, ArrayList<Integer>>> predictionAndLabels; + private transient JavaRDD<Tuple2<List<Integer>, List<Integer>>> predictionAndLabels; @Before public void setUp() { sc = new JavaSparkContext("local", "JavaRankingMetricsSuite"); - predictionAndLabels = sc.parallelize(Lists.newArrayList( + predictionAndLabels = sc.parallelize(Arrays.asList( Tuple2$.MODULE$.apply( - Lists.newArrayList(1, 6, 2, 7, 8, 3, 9, 10, 4, 5), Lists.newArrayList(1, 2, 3, 4, 5)), + Arrays.asList(1, 6, 2, 7, 8, 3, 9, 10, 4, 5), Arrays.asList(1, 2, 3, 4, 5)), Tuple2$.MODULE$.apply( - Lists.newArrayList(4, 1, 5, 6, 2, 7, 3, 8, 9, 10), Lists.newArrayList(1, 2, 3)), + Arrays.asList(4, 1, 5, 6, 2, 7, 3, 8, 9, 10), Arrays.asList(1, 2, 3)), Tuple2$.MODULE$.apply( - Lists.newArrayList(1, 2, 3, 4, 5), Lists.<Integer>newArrayList())), 2); + Arrays.asList(1, 2, 3, 4, 5), Arrays.<Integer>asList())), 2); } @After diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala index aea3d9b694490a3fdbba3aa55ffcb96a2af0e1ed..98bc9511163e76b42afd8058eac2af8318f194e2 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala @@ -21,7 +21,7 @@ import breeze.linalg.{Vector => BV} import org.apache.spark.SparkFunSuite import org.apache.spark.ml.param.ParamsSuite -import org.apache.spark.mllib.classification.NaiveBayes +import org.apache.spark.mllib.classification.NaiveBayes.{Multinomial, Bernoulli} import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ @@ -31,8 +31,6 @@ import org.apache.spark.sql.Row class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext { - import NaiveBayes.{Multinomial, Bernoulli} - def validatePrediction(predictionAndLabels: DataFrame): Unit = { val numOfErrorPredictions = predictionAndLabels.collect().count { case Row(prediction: Double, label: Double) => diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java index f76bb49e874fcdd9f3035e727f701482eb5c7d9d..f0363830b61ac42f8403050a9385bb788d5630a8 100644 --- a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java +++ b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java @@ -52,6 +52,11 @@ public final class ChunkFetchFailure implements ResponseMessage { return new ChunkFetchFailure(streamChunkId, errorString); } + @Override + public int hashCode() { + return Objects.hashCode(streamChunkId, errorString); + } + @Override public boolean equals(Object other) { if (other instanceof ChunkFetchFailure) { diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchRequest.java b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchRequest.java index 980947cf13f6b601ebcb7e19f42b903d4bf8501c..5a173af54f618e773c7a795b33cc089d1416e797 100644 --- a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchRequest.java +++ b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchRequest.java @@ -48,6 +48,11 @@ public final class ChunkFetchRequest implements RequestMessage { return new ChunkFetchRequest(StreamChunkId.decode(buf)); } + @Override + public int hashCode() { + return streamChunkId.hashCode(); + } + @Override public boolean equals(Object other) { if (other instanceof ChunkFetchRequest) { diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java index ff4936470c697013e19cc050102cac6c36c6f9b4..c962fb7ecf76d61cf8996a9277ed2d41e6903b0c 100644 --- a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java +++ b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java @@ -61,6 +61,11 @@ public final class ChunkFetchSuccess implements ResponseMessage { return new ChunkFetchSuccess(streamChunkId, managedBuf); } + @Override + public int hashCode() { + return Objects.hashCode(streamChunkId, buffer); + } + @Override public boolean equals(Object other) { if (other instanceof ChunkFetchSuccess) { diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java b/network/common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java index 6b991375fc486062c11afdea36e62bf592e7f24e..2dfc7876ba328ad3024e1e448737a2a7749c8e6b 100644 --- a/network/common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java +++ b/network/common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java @@ -50,6 +50,11 @@ public final class RpcFailure implements ResponseMessage { return new RpcFailure(requestId, errorString); } + @Override + public int hashCode() { + return Objects.hashCode(requestId, errorString); + } + @Override public boolean equals(Object other) { if (other instanceof RpcFailure) { diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/RpcRequest.java b/network/common/src/main/java/org/apache/spark/network/protocol/RpcRequest.java index cdee0b0e0316b6c374a9f8e9b48f2a11267dec18..745039db742fa1d4211855221afea0e13c2791f3 100644 --- a/network/common/src/main/java/org/apache/spark/network/protocol/RpcRequest.java +++ b/network/common/src/main/java/org/apache/spark/network/protocol/RpcRequest.java @@ -59,6 +59,11 @@ public final class RpcRequest implements RequestMessage { return new RpcRequest(requestId, message); } + @Override + public int hashCode() { + return Objects.hashCode(requestId, Arrays.hashCode(message)); + } + @Override public boolean equals(Object other) { if (other instanceof RpcRequest) { diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/RpcResponse.java b/network/common/src/main/java/org/apache/spark/network/protocol/RpcResponse.java index 0a62e09a8115c77153e9079c560bf0fa2c748dba..1671cd444f03907b5275e87652b50880b9001dbb 100644 --- a/network/common/src/main/java/org/apache/spark/network/protocol/RpcResponse.java +++ b/network/common/src/main/java/org/apache/spark/network/protocol/RpcResponse.java @@ -50,6 +50,11 @@ public final class RpcResponse implements ResponseMessage { return new RpcResponse(requestId, response); } + @Override + public int hashCode() { + return Objects.hashCode(requestId, Arrays.hashCode(response)); + } + @Override public boolean equals(Object other) { if (other instanceof RpcResponse) { diff --git a/network/common/src/test/java/org/apache/spark/network/TestManagedBuffer.java b/network/common/src/test/java/org/apache/spark/network/TestManagedBuffer.java index 38113a918f795361a37d0bc536c19656342af2ef..83c90f9eff2b1d37d5d90c435260e6fc83b8e395 100644 --- a/network/common/src/test/java/org/apache/spark/network/TestManagedBuffer.java +++ b/network/common/src/test/java/org/apache/spark/network/TestManagedBuffer.java @@ -80,6 +80,11 @@ public class TestManagedBuffer extends ManagedBuffer { return underlying.convertToNetty(); } + @Override + public int hashCode() { + return underlying.hashCode(); + } + @Override public boolean equals(Object other) { if (other instanceof ManagedBuffer) { diff --git a/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java b/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java index be6632bb8cf49f6cfe1d975ca8da8eed70156057..8104004847a24d3a2472ec1433658442c1946fea 100644 --- a/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java +++ b/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java @@ -17,11 +17,11 @@ package org.apache.spark.network.sasl; -import static com.google.common.base.Charsets.UTF_8; import static org.junit.Assert.*; import static org.mockito.Mockito.*; import java.io.File; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Random; @@ -138,8 +138,8 @@ public class SparkSaslSuite { public Void answer(InvocationOnMock invocation) { byte[] message = (byte[]) invocation.getArguments()[1]; RpcResponseCallback cb = (RpcResponseCallback) invocation.getArguments()[2]; - assertEquals("Ping", new String(message, UTF_8)); - cb.onSuccess("Pong".getBytes(UTF_8)); + assertEquals("Ping", new String(message, StandardCharsets.UTF_8)); + cb.onSuccess("Pong".getBytes(StandardCharsets.UTF_8)); return null; } }) @@ -148,8 +148,9 @@ public class SparkSaslSuite { SaslTestCtx ctx = new SaslTestCtx(rpcHandler, encrypt, false); try { - byte[] response = ctx.client.sendRpcSync("Ping".getBytes(UTF_8), TimeUnit.SECONDS.toMillis(10)); - assertEquals("Pong", new String(response, UTF_8)); + byte[] response = ctx.client.sendRpcSync("Ping".getBytes(StandardCharsets.UTF_8), + TimeUnit.SECONDS.toMillis(10)); + assertEquals("Pong", new String(response, StandardCharsets.UTF_8)); } finally { ctx.close(); } @@ -235,7 +236,7 @@ public class SparkSaslSuite { final String blockSizeConf = "spark.network.sasl.maxEncryptedBlockSize"; System.setProperty(blockSizeConf, "1k"); - final AtomicReference<ManagedBuffer> response = new AtomicReference(); + final AtomicReference<ManagedBuffer> response = new AtomicReference<>(); final File file = File.createTempFile("sasltest", ".txt"); SaslTestCtx ctx = null; try { @@ -321,7 +322,8 @@ public class SparkSaslSuite { SaslTestCtx ctx = null; try { ctx = new SaslTestCtx(mock(RpcHandler.class), true, true); - ctx.client.sendRpcSync("Ping".getBytes(UTF_8), TimeUnit.SECONDS.toMillis(10)); + ctx.client.sendRpcSync("Ping".getBytes(StandardCharsets.UTF_8), + TimeUnit.SECONDS.toMillis(10)); fail("Should have failed to send RPC to server."); } catch (Exception e) { assertFalse(e.getCause() instanceof TimeoutException); diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java index 73374cdc77a23b5a6e9fe0e36486839f697c53b4..1d197497b7c8fdaad01b96002f5c83c78d9eb54a 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java @@ -90,9 +90,11 @@ public class ExternalShuffleBlockHandlerSuite { (StreamHandle) BlockTransferMessage.Decoder.fromByteArray(response.getValue()); assertEquals(2, handle.numChunks); - ArgumentCaptor<Iterator> stream = ArgumentCaptor.forClass(Iterator.class); + @SuppressWarnings("unchecked") + ArgumentCaptor<Iterator<ManagedBuffer>> stream = (ArgumentCaptor<Iterator<ManagedBuffer>>) + (ArgumentCaptor<?>) ArgumentCaptor.forClass(Iterator.class); verify(streamManager, times(1)).registerStream(stream.capture()); - Iterator<ManagedBuffer> buffers = (Iterator<ManagedBuffer>) stream.getValue(); + Iterator<ManagedBuffer> buffers = stream.getValue(); assertEquals(block0Marker, buffers.next()); assertEquals(block1Marker, buffers.next()); assertFalse(buffers.hasNext()); diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java index 1ad0d72ae5ec549ffaaaff825bac323f62389611..06e46f92410949cd68368476a2fc860497498093 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java @@ -20,7 +20,9 @@ package org.apache.spark.network.shuffle; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import com.google.common.collect.ImmutableMap; @@ -67,13 +69,13 @@ public class RetryingBlockFetcherSuite { public void testNoFailures() throws IOException { BlockFetchingListener listener = mock(BlockFetchingListener.class); - Map[] interactions = new Map[] { + List<? extends Map<String, Object>> interactions = Arrays.asList( // Immediately return both blocks successfully. ImmutableMap.<String, Object>builder() .put("b0", block0) .put("b1", block1) - .build(), - }; + .build() + ); performInteractions(interactions, listener); @@ -86,13 +88,13 @@ public class RetryingBlockFetcherSuite { public void testUnrecoverableFailure() throws IOException { BlockFetchingListener listener = mock(BlockFetchingListener.class); - Map[] interactions = new Map[] { + List<? extends Map<String, Object>> interactions = Arrays.asList( // b0 throws a non-IOException error, so it will be failed without retry. ImmutableMap.<String, Object>builder() .put("b0", new RuntimeException("Ouch!")) .put("b1", block1) - .build(), - }; + .build() + ); performInteractions(interactions, listener); @@ -105,7 +107,7 @@ public class RetryingBlockFetcherSuite { public void testSingleIOExceptionOnFirst() throws IOException { BlockFetchingListener listener = mock(BlockFetchingListener.class); - Map[] interactions = new Map[] { + List<? extends Map<String, Object>> interactions = Arrays.asList( // IOException will cause a retry. Since b0 fails, we will retry both. ImmutableMap.<String, Object>builder() .put("b0", new IOException("Connection failed or something")) @@ -114,8 +116,8 @@ public class RetryingBlockFetcherSuite { ImmutableMap.<String, Object>builder() .put("b0", block0) .put("b1", block1) - .build(), - }; + .build() + ); performInteractions(interactions, listener); @@ -128,7 +130,7 @@ public class RetryingBlockFetcherSuite { public void testSingleIOExceptionOnSecond() throws IOException { BlockFetchingListener listener = mock(BlockFetchingListener.class); - Map[] interactions = new Map[] { + List<? extends Map<String, Object>> interactions = Arrays.asList( // IOException will cause a retry. Since b1 fails, we will not retry b0. ImmutableMap.<String, Object>builder() .put("b0", block0) @@ -136,8 +138,8 @@ public class RetryingBlockFetcherSuite { .build(), ImmutableMap.<String, Object>builder() .put("b1", block1) - .build(), - }; + .build() + ); performInteractions(interactions, listener); @@ -150,7 +152,7 @@ public class RetryingBlockFetcherSuite { public void testTwoIOExceptions() throws IOException { BlockFetchingListener listener = mock(BlockFetchingListener.class); - Map[] interactions = new Map[] { + List<? extends Map<String, Object>> interactions = Arrays.asList( // b0's IOException will trigger retry, b1's will be ignored. ImmutableMap.<String, Object>builder() .put("b0", new IOException()) @@ -164,8 +166,8 @@ public class RetryingBlockFetcherSuite { // b1 returns successfully within 2 retries. ImmutableMap.<String, Object>builder() .put("b1", block1) - .build(), - }; + .build() + ); performInteractions(interactions, listener); @@ -178,7 +180,7 @@ public class RetryingBlockFetcherSuite { public void testThreeIOExceptions() throws IOException { BlockFetchingListener listener = mock(BlockFetchingListener.class); - Map[] interactions = new Map[] { + List<? extends Map<String, Object>> interactions = Arrays.asList( // b0's IOException will trigger retry, b1's will be ignored. ImmutableMap.<String, Object>builder() .put("b0", new IOException()) @@ -196,8 +198,8 @@ public class RetryingBlockFetcherSuite { // This is not reached -- b1 has failed. ImmutableMap.<String, Object>builder() .put("b1", block1) - .build(), - }; + .build() + ); performInteractions(interactions, listener); @@ -210,7 +212,7 @@ public class RetryingBlockFetcherSuite { public void testRetryAndUnrecoverable() throws IOException { BlockFetchingListener listener = mock(BlockFetchingListener.class); - Map[] interactions = new Map[] { + List<? extends Map<String, Object>> interactions = Arrays.asList( // b0's IOException will trigger retry, subsequent messages will be ignored. ImmutableMap.<String, Object>builder() .put("b0", new IOException()) @@ -226,8 +228,8 @@ public class RetryingBlockFetcherSuite { // b2 succeeds in its last retry. ImmutableMap.<String, Object>builder() .put("b2", block2) - .build(), - }; + .build() + ); performInteractions(interactions, listener); @@ -248,7 +250,8 @@ public class RetryingBlockFetcherSuite { * subset of the original blocks in a second interaction. */ @SuppressWarnings("unchecked") - private void performInteractions(final Map[] interactions, BlockFetchingListener listener) + private static void performInteractions(List<? extends Map<String, Object>> interactions, + BlockFetchingListener listener) throws IOException { TransportConf conf = new TransportConf(new SystemPropertyConfigProvider()); diff --git a/pom.xml b/pom.xml index a958cec867eae3ce3391b8934642f71507225c3c..b4ee3ccb0bff592ca664df0436078951b8a5b5d0 100644 --- a/pom.xml +++ b/pom.xml @@ -1849,6 +1849,7 @@ <javacArg>${java.version}</javacArg> <javacArg>-target</javacArg> <javacArg>${java.version}</javacArg> + <javacArg>-Xlint:all,-serial,-path</javacArg> </javacArgs> </configuration> </plugin> @@ -1862,6 +1863,9 @@ <encoding>UTF-8</encoding> <maxmem>1024m</maxmem> <fork>true</fork> + <compilerArgs> + <arg>-Xlint:all,-serial,-path</arg> + </compilerArgs> </configuration> </plugin> <!-- Surefire runs all Java tests --> diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java index 2c669bb59a0b5d49d20614a029b6a6455c9d850f..7302361ab9fdb1d33e6007a3aec8f05c7538c279 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java @@ -167,6 +167,7 @@ public class JavaDataFrameSuite { for (int i = 0; i < result.length(); i++) { Assert.assertEquals(bean.getB()[i], result.apply(i)); } + @SuppressWarnings("unchecked") Seq<Integer> outputBuffer = (Seq<Integer>) first.getJavaMap(2).get("hello"); Assert.assertArrayEquals( bean.getC().get("hello"), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala index cfb03ff485b7c9c055790d37766c3001bb175237..e34e0956d1fddf923f945dea476feab0fa6d8b78 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala @@ -17,14 +17,12 @@ package org.apache.spark.sql.sources +import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String class DefaultSource extends SimpleScanSource @@ -73,7 +71,7 @@ case class AllDataTypesScan( sqlContext.sparkContext.parallelize(from to to).map { i => Row( s"str_$i", - s"str_$i".getBytes(), + s"str_$i".getBytes(StandardCharsets.UTF_8), i % 2 == 0, i.toByte, i.toShort, @@ -83,7 +81,7 @@ case class AllDataTypesScan( i.toDouble, new java.math.BigDecimal(i), new java.math.BigDecimal(i), - new Date(1970, 1, 1), + Date.valueOf("1970-01-01"), new Timestamp(20000 + i), s"varchar_$i", Seq(i, i + 1), @@ -92,7 +90,7 @@ case class AllDataTypesScan( Map(Map(s"str_$i" -> i.toFloat) -> Row(i.toLong)), Row(i, i.toString), Row(Seq(s"str_$i", s"str_${i + 1}"), - Row(Seq(new Date(1970, 1, i + 1))))) + Row(Seq(Date.valueOf(s"1970-01-${i + 1}"))))) } } } @@ -113,7 +111,7 @@ class TableScanSuite extends DataSourceTest { i.toDouble, new java.math.BigDecimal(i), new java.math.BigDecimal(i), - new Date(1970, 1, 1), + Date.valueOf("1970-01-01"), new Timestamp(20000 + i), s"varchar_$i", Seq(i, i + 1), @@ -121,7 +119,7 @@ class TableScanSuite extends DataSourceTest { Map(i -> i.toString), Map(Map(s"str_$i" -> i.toFloat) -> Row(i.toLong)), Row(i, i.toString), - Row(Seq(s"str_$i", s"str_${i + 1}"), Row(Seq(new Date(1970, 1, i + 1))))) + Row(Seq(s"str_$i", s"str_${i + 1}"), Row(Seq(Date.valueOf(s"1970-01-${i + 1}"))))) }.toSeq before { @@ -280,7 +278,7 @@ class TableScanSuite extends DataSourceTest { sqlTest( "SELECT structFieldComplex.Value.`value_(2)` FROM tableWithSchema", - (1 to 10).map(i => Row(Seq(new Date(1970, 1, i + 1)))).toSeq) + (1 to 10).map(i => Row(Seq(Date.valueOf(s"1970-01-${i + 1}")))).toSeq) test("Caching") { // Cached Query Execution diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index f58bc7d7a0af450bdc0a137aae0afddaf6d6afb8..a7d5a991948d925293c0e64b51af7bcaf825d457 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -77,7 +77,7 @@ private[hive] object IsolatedClientLoader { // TODO: Remove copy logic. val tempDir = Utils.createTempDir(namePrefix = s"hive-${version}") allFiles.foreach(f => FileUtils.copyFileToDirectory(f, tempDir)) - tempDir.listFiles().map(_.toURL) + tempDir.listFiles().map(_.toURI.toURL) } private def resolvedVersions = new scala.collection.mutable.HashMap[HiveVersion, Seq[URL]] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index a47f9a4feb21b4713dcba31856cf76a48a2f81dd..05a78930afe3d57e380a84d4d69a887e6245ec4e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -88,7 +88,7 @@ case class AddJar(path: String) extends RunnableCommand { val currentClassLoader = Utils.getContextOrSparkClassLoader // Add jar to current context - val jarURL = new java.io.File(path).toURL + val jarURL = new java.io.File(path).toURI.toURL val newClassLoader = new java.net.URLClassLoader(Array(jarURL), currentClassLoader) Thread.currentThread.setContextClassLoader(newClassLoader) // We need to explicitly set the class loader associated with the conf in executionHive's diff --git a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java b/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java index 741a3cd31c603f83b78f76c73b2e103ad2a944f0..613b2bcc80e37427846696a1b7e51053fcc301d7 100644 --- a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java +++ b/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java @@ -54,7 +54,7 @@ public class JavaDataFrameSuite { for (int i = 0; i < 10; i++) { jsonObjects.add("{\"key\":" + i + ", \"value\":\"str" + i + "\"}"); } - df = hc.jsonRDD(sc.parallelize(jsonObjects)); + df = hc.read().json(sc.parallelize(jsonObjects)); df.registerTempTable("window_table"); } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 95c1da6e9796caaa8d0e7172829704c8e1cf70f0..fb414518036349b03df7bb388edc764ab03b6c81 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -660,7 +660,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils { test("resolve udtf in projection #2") { val rdd = sparkContext.makeRDD((1 to 2).map(i => s"""{"a":[$i, ${i + 1}]}""")) - jsonRDD(rdd).registerTempTable("data") + read.json(rdd).registerTempTable("data") checkAnswer(sql("SELECT explode(map(1, 1)) FROM data LIMIT 1"), Row(1, 1) :: Nil) checkAnswer(sql("SELECT explode(map(1, 1)) as (k1, k2) FROM data LIMIT 1"), Row(1, 1) :: Nil) intercept[AnalysisException] { @@ -675,7 +675,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils { // TGF with non-TGF in project is allowed in Spark SQL, but not in Hive test("TGF with non-TGF in projection") { val rdd = sparkContext.makeRDD( """{"a": "1", "b":"1"}""" :: Nil) - jsonRDD(rdd).registerTempTable("data") + read.json(rdd).registerTempTable("data") checkAnswer( sql("SELECT explode(map(a, b)) as (k1, k2), a, b FROM data"), Row("1", "1", "1", "1") :: Nil) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 58bdda7794bf223f833eab0cecec3555cb6a17b9..7e735562dca337c49f1f2f3489798c7e4abc9e91 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -40,7 +40,9 @@ private[scheduler] case class ErrorReported(msg: String, e: Throwable) extends J private[streaming] class JobScheduler(val ssc: StreamingContext) extends Logging { - private val jobSets = new ConcurrentHashMap[Time, JobSet] + // Use of ConcurrentHashMap.keySet later causes an odd runtime problem due to Java 7/8 diff + // https://gist.github.com/AlainODea/1375759b8720a3f9f094 + private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet] private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) private val jobExecutor = Executors.newFixedThreadPool(numConcurrentJobs) private val jobGenerator = new JobGenerator(this) diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java index 50e8f9fc159c8719af33a2c9331a900c2eaf742d..175b8a496b4e5116c8e6b02a8aaf95c046deb70a 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java @@ -17,13 +17,15 @@ package org.apache.spark.streaming; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Collection; +import java.util.Iterator; +import java.util.List; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.Transformer; +import com.google.common.base.Function; +import com.google.common.collect.Iterators; import org.apache.spark.SparkConf; import org.apache.spark.streaming.util.WriteAheadLog; import org.apache.spark.streaming.util.WriteAheadLogRecordHandle; @@ -32,40 +34,40 @@ import org.apache.spark.streaming.util.WriteAheadLogUtils; import org.junit.Test; import org.junit.Assert; -class JavaWriteAheadLogSuiteHandle extends WriteAheadLogRecordHandle { - int index = -1; - public JavaWriteAheadLogSuiteHandle(int idx) { - index = idx; - } -} - public class JavaWriteAheadLogSuite extends WriteAheadLog { - class Record { + static class JavaWriteAheadLogSuiteHandle extends WriteAheadLogRecordHandle { + int index = -1; + JavaWriteAheadLogSuiteHandle(int idx) { + index = idx; + } + } + + static class Record { long time; int index; ByteBuffer buffer; - public Record(long tym, int idx, ByteBuffer buf) { + Record(long tym, int idx, ByteBuffer buf) { index = idx; time = tym; buffer = buf; } } private int index = -1; - private ArrayList<Record> records = new ArrayList<Record>(); + private final List<Record> records = new ArrayList<>(); // Methods for WriteAheadLog @Override - public WriteAheadLogRecordHandle write(java.nio.ByteBuffer record, long time) { + public WriteAheadLogRecordHandle write(ByteBuffer record, long time) { index += 1; - records.add(new org.apache.spark.streaming.JavaWriteAheadLogSuite.Record(time, index, record)); + records.add(new Record(time, index, record)); return new JavaWriteAheadLogSuiteHandle(index); } @Override - public java.nio.ByteBuffer read(WriteAheadLogRecordHandle handle) { + public ByteBuffer read(WriteAheadLogRecordHandle handle) { if (handle instanceof JavaWriteAheadLogSuiteHandle) { int reqdIndex = ((JavaWriteAheadLogSuiteHandle) handle).index; for (Record record: records) { @@ -78,14 +80,13 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog { } @Override - public java.util.Iterator<java.nio.ByteBuffer> readAll() { - Collection<ByteBuffer> buffers = CollectionUtils.collect(records, new Transformer() { + public Iterator<ByteBuffer> readAll() { + return Iterators.transform(records.iterator(), new Function<Record,ByteBuffer>() { @Override - public Object transform(Object input) { - return ((Record) input).buffer; + public ByteBuffer apply(Record input) { + return input.buffer; } }); - return buffers.iterator(); } @Override @@ -110,20 +111,21 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog { WriteAheadLog wal = WriteAheadLogUtils.createLogForDriver(conf, null, null); String data1 = "data1"; - WriteAheadLogRecordHandle handle = wal.write(ByteBuffer.wrap(data1.getBytes()), 1234); + WriteAheadLogRecordHandle handle = + wal.write(ByteBuffer.wrap(data1.getBytes(StandardCharsets.UTF_8)), 1234); Assert.assertTrue(handle instanceof JavaWriteAheadLogSuiteHandle); - Assert.assertTrue(new String(wal.read(handle).array()).equals(data1)); + Assert.assertEquals(new String(wal.read(handle).array(), StandardCharsets.UTF_8), data1); - wal.write(ByteBuffer.wrap("data2".getBytes()), 1235); - wal.write(ByteBuffer.wrap("data3".getBytes()), 1236); - wal.write(ByteBuffer.wrap("data4".getBytes()), 1237); + wal.write(ByteBuffer.wrap("data2".getBytes(StandardCharsets.UTF_8)), 1235); + wal.write(ByteBuffer.wrap("data3".getBytes(StandardCharsets.UTF_8)), 1236); + wal.write(ByteBuffer.wrap("data4".getBytes(StandardCharsets.UTF_8)), 1237); wal.clean(1236, false); - java.util.Iterator<java.nio.ByteBuffer> dataIterator = wal.readAll(); - ArrayList<String> readData = new ArrayList<String>(); + Iterator<ByteBuffer> dataIterator = wal.readAll(); + List<String> readData = new ArrayList<>(); while (dataIterator.hasNext()) { - readData.add(new String(dataIterator.next().array())); + readData.add(new String(dataIterator.next().array(), StandardCharsets.UTF_8)); } - Assert.assertTrue(readData.equals(Arrays.asList("data3", "data4"))); + Assert.assertEquals(readData, Arrays.asList("data3", "data4")); } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala index a08578680cff9eb8251727bec40ef9f7517d2967..068a6cb0e8fa4375f447951d46d2764bb81879fd 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala @@ -100,8 +100,8 @@ class UISeleniumSuite // Check stat table val statTableHeaders = findAll(cssSelector("#stat-table th")).map(_.text).toSeq statTableHeaders.exists( - _.matches("Timelines \\(Last \\d+ batches, \\d+ active, \\d+ completed\\)")) should be - (true) + _.matches("Timelines \\(Last \\d+ batches, \\d+ active, \\d+ completed\\)") + ) should be (true) statTableHeaders should contain ("Histograms") val statTableCells = findAll(cssSelector("#stat-table td")).map(_.text).toSeq