diff --git a/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java b/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java index 4477c9a935f21fc741c7c86484746008eaa057af..09fc80d12d510e120e25195377cb940cf6c5d482 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java @@ -26,7 +26,6 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.*; -import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.buffer.NioManagedBuffer; import org.apache.spark.network.client.ChunkReceivedCallback; import org.apache.spark.network.client.RpcResponseCallback; diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java index 87b9e8eb445aac16236eef99fd64829291e00839..10a7cb1d06659ea46c49c8e6c2e866156d3a3424 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java @@ -153,7 +153,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable, * * Unlike getBytes this will not create a copy the array if this is a slice. */ - public @Nonnull ByteBuffer getByteBuffer() { + @Nonnull + public ByteBuffer getByteBuffer() { if (base instanceof byte[] && offset >= BYTE_ARRAY_OFFSET) { final byte[] bytes = (byte[]) base; diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java index 9fe97b4d9c20ce901aca7059974121658f5c795f..140c52fd12f9466b7678dc65157f691354e942a5 100644 --- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java +++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java @@ -30,116 +30,117 @@ import org.apache.spark.scheduler.*; */ public class SparkFirehoseListener implements SparkListenerInterface { - public void onEvent(SparkListenerEvent event) { } - - @Override - public final void onStageCompleted(SparkListenerStageCompleted stageCompleted) { - onEvent(stageCompleted); - } - - @Override - public final void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { - onEvent(stageSubmitted); - } - - @Override - public final void onTaskStart(SparkListenerTaskStart taskStart) { - onEvent(taskStart); - } - - @Override - public final void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { - onEvent(taskGettingResult); - } - - @Override - public final void onTaskEnd(SparkListenerTaskEnd taskEnd) { - onEvent(taskEnd); - } - - @Override - public final void onJobStart(SparkListenerJobStart jobStart) { - onEvent(jobStart); - } - - @Override - public final void onJobEnd(SparkListenerJobEnd jobEnd) { - onEvent(jobEnd); - } - - @Override - public final void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { - onEvent(environmentUpdate); - } - - @Override - public final void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { - onEvent(blockManagerAdded); - } - - @Override - public final void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { - onEvent(blockManagerRemoved); - } - - @Override - public final void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { - onEvent(unpersistRDD); - } - - @Override - public final void onApplicationStart(SparkListenerApplicationStart applicationStart) { - onEvent(applicationStart); - } - - @Override - public final void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { - onEvent(applicationEnd); - } - - @Override - public final void onExecutorMetricsUpdate( - SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { - onEvent(executorMetricsUpdate); - } - - @Override - public final void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { - onEvent(executorAdded); - } - - @Override - public final void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { - onEvent(executorRemoved); - } - - @Override - public final void onExecutorBlacklisted(SparkListenerExecutorBlacklisted executorBlacklisted) { - onEvent(executorBlacklisted); - } - - @Override - public final void onExecutorUnblacklisted(SparkListenerExecutorUnblacklisted executorUnblacklisted) { - onEvent(executorUnblacklisted); - } - - @Override - public final void onNodeBlacklisted(SparkListenerNodeBlacklisted nodeBlacklisted) { - onEvent(nodeBlacklisted); - } - - @Override - public final void onNodeUnblacklisted(SparkListenerNodeUnblacklisted nodeUnblacklisted) { - onEvent(nodeUnblacklisted); - } - - @Override - public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { - onEvent(blockUpdated); - } - - @Override - public void onOtherEvent(SparkListenerEvent event) { - onEvent(event); - } + public void onEvent(SparkListenerEvent event) { } + + @Override + public final void onStageCompleted(SparkListenerStageCompleted stageCompleted) { + onEvent(stageCompleted); + } + + @Override + public final void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { + onEvent(stageSubmitted); + } + + @Override + public final void onTaskStart(SparkListenerTaskStart taskStart) { + onEvent(taskStart); + } + + @Override + public final void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { + onEvent(taskGettingResult); + } + + @Override + public final void onTaskEnd(SparkListenerTaskEnd taskEnd) { + onEvent(taskEnd); + } + + @Override + public final void onJobStart(SparkListenerJobStart jobStart) { + onEvent(jobStart); + } + + @Override + public final void onJobEnd(SparkListenerJobEnd jobEnd) { + onEvent(jobEnd); + } + + @Override + public final void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { + onEvent(environmentUpdate); + } + + @Override + public final void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { + onEvent(blockManagerAdded); + } + + @Override + public final void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { + onEvent(blockManagerRemoved); + } + + @Override + public final void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { + onEvent(unpersistRDD); + } + + @Override + public final void onApplicationStart(SparkListenerApplicationStart applicationStart) { + onEvent(applicationStart); + } + + @Override + public final void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { + onEvent(applicationEnd); + } + + @Override + public final void onExecutorMetricsUpdate( + SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { + onEvent(executorMetricsUpdate); + } + + @Override + public final void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { + onEvent(executorAdded); + } + + @Override + public final void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { + onEvent(executorRemoved); + } + + @Override + public final void onExecutorBlacklisted(SparkListenerExecutorBlacklisted executorBlacklisted) { + onEvent(executorBlacklisted); + } + + @Override + public final void onExecutorUnblacklisted( + SparkListenerExecutorUnblacklisted executorUnblacklisted) { + onEvent(executorUnblacklisted); + } + + @Override + public final void onNodeBlacklisted(SparkListenerNodeBlacklisted nodeBlacklisted) { + onEvent(nodeBlacklisted); + } + + @Override + public final void onNodeUnblacklisted(SparkListenerNodeUnblacklisted nodeUnblacklisted) { + onEvent(nodeUnblacklisted); + } + + @Override + public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { + onEvent(blockUpdated); + } + + @Override + public void onOtherEvent(SparkListenerEvent event) { + onEvent(event); + } } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 29aca04a3d11b71756ec3dedf860ca04a75068ff..f312fa2b2ddd7751b22f8b826922ad36d7cefd6f 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -161,7 +161,9 @@ public final class UnsafeExternalSorter extends MemoryConsumer { // Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at // the end of the task. This is necessary to avoid memory leaks in when the downstream operator // does not fully consume the sorter's output (e.g. sort followed by limit). - taskContext.addTaskCompletionListener(context -> { cleanupResources(); }); + taskContext.addTaskCompletionListener(context -> { + cleanupResources(); + }); } /** diff --git a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java index 512149127d72f4f8f7ed7438b9ba9015a07e254f..01b5fb7b466846beaee6a5018bcdb611688567c4 100644 --- a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java @@ -358,7 +358,7 @@ public class JavaAPISuite implements Serializable { // Regression test for SPARK-4459 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); Function<Tuple2<Integer, Integer>, Boolean> areOdd = - x -> (x._1() % 2 == 0) && (x._2() % 2 == 0); + x -> (x._1() % 2 == 0) && (x._2() % 2 == 0); JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd); JavaPairRDD<Boolean, Iterable<Tuple2<Integer, Integer>>> oddsAndEvens = pairRDD.groupBy(areOdd); assertEquals(2, oddsAndEvens.count()); @@ -528,14 +528,14 @@ public class JavaAPISuite implements Serializable { new Tuple2<>(5, 3)), 2); Map<Integer, HashSet<Integer>> sets = pairs.aggregateByKey(new HashSet<Integer>(), - (a, b) -> { - a.add(b); - return a; - }, - (a, b) -> { - a.addAll(b); - return a; - }).collectAsMap(); + (a, b) -> { + a.add(b); + return a; + }, + (a, b) -> { + a.addAll(b); + return a; + }).collectAsMap(); assertEquals(3, sets.size()); assertEquals(new HashSet<>(Arrays.asList(1)), sets.get(1)); assertEquals(new HashSet<>(Arrays.asList(2)), sets.get(3)); @@ -666,8 +666,8 @@ public class JavaAPISuite implements Serializable { assertArrayEquals(expected_counts, histogram); // SPARK-5744 assertArrayEquals( - new long[] {0}, - sc.parallelizeDoubles(new ArrayList<>(0), 1).histogram(new double[]{0.0, 1.0})); + new long[] {0}, + sc.parallelizeDoubles(new ArrayList<>(0), 1).histogram(new double[]{0.0, 1.0})); } private static class DoubleComparator implements Comparator<Double>, Serializable { @@ -807,7 +807,7 @@ public class JavaAPISuite implements Serializable { // Regression test for SPARK-668: JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair( - item -> Collections.singletonList(item.swap()).iterator()); + item -> Collections.singletonList(item.swap()).iterator()); swapped.collect(); // There was never a bug here, but it's worth testing: @@ -845,11 +845,13 @@ public class JavaAPISuite implements Serializable { public void getNumPartitions(){ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3); JavaDoubleRDD rdd2 = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0), 2); - JavaPairRDD<String, Integer> rdd3 = sc.parallelizePairs(Arrays.asList( - new Tuple2<>("a", 1), - new Tuple2<>("aa", 2), - new Tuple2<>("aaa", 3) - ), 2); + JavaPairRDD<String, Integer> rdd3 = sc.parallelizePairs( + Arrays.asList( + new Tuple2<>("a", 1), + new Tuple2<>("aa", 2), + new Tuple2<>("aaa", 3) + ), + 2); assertEquals(3, rdd1.getNumPartitions()); assertEquals(2, rdd2.getNumPartitions()); assertEquals(2, rdd3.getNumPartitions()); @@ -977,7 +979,7 @@ public class JavaAPISuite implements Serializable { JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()))) - .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); + .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); // Try reading the output back as an object file JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, @@ -1068,11 +1070,11 @@ public class JavaAPISuite implements Serializable { JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()))) - .saveAsNewAPIHadoopFile(outputDir, IntWritable.class, Text.class, + .saveAsNewAPIHadoopFile(outputDir, IntWritable.class, Text.class, org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class); JavaPairRDD<IntWritable, Text> output = - sc.sequenceFile(outputDir, IntWritable.class, Text.class); + sc.sequenceFile(outputDir, IntWritable.class, Text.class); assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString()); } @@ -1088,11 +1090,11 @@ public class JavaAPISuite implements Serializable { JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()))) - .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); + .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir, - org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, - IntWritable.class, Text.class, Job.getInstance().getConfiguration()); + org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, + IntWritable.class, Text.class, Job.getInstance().getConfiguration()); assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString()); } @@ -1135,10 +1137,10 @@ public class JavaAPISuite implements Serializable { JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()))) - .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); + .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir, - SequenceFileInputFormat.class, IntWritable.class, Text.class); + SequenceFileInputFormat.class, IntWritable.class, Text.class); assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString()); } @@ -1154,10 +1156,11 @@ public class JavaAPISuite implements Serializable { JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()))) - .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class, DefaultCodec.class); + .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, + SequenceFileOutputFormat.class, DefaultCodec.class); JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir, - SequenceFileInputFormat.class, IntWritable.class, Text.class); + SequenceFileInputFormat.class, IntWritable.class, Text.class); assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString()); } @@ -1263,23 +1266,23 @@ public class JavaAPISuite implements Serializable { Function2<Integer, Integer, Integer> mergeValueFunction = (v1, v2) -> v1 + v2; JavaPairRDD<Integer, Integer> combinedRDD = originalRDD.keyBy(keyFunction) - .combineByKey(createCombinerFunction, mergeValueFunction, mergeValueFunction); + .combineByKey(createCombinerFunction, mergeValueFunction, mergeValueFunction); Map<Integer, Integer> results = combinedRDD.collectAsMap(); ImmutableMap<Integer, Integer> expected = ImmutableMap.of(0, 9, 1, 5, 2, 7); assertEquals(expected, results); Partitioner defaultPartitioner = Partitioner.defaultPartitioner( - combinedRDD.rdd(), - JavaConverters.collectionAsScalaIterableConverter( - Collections.<RDD<?>>emptyList()).asScala().toSeq()); + combinedRDD.rdd(), + JavaConverters.collectionAsScalaIterableConverter( + Collections.<RDD<?>>emptyList()).asScala().toSeq()); combinedRDD = originalRDD.keyBy(keyFunction) - .combineByKey( - createCombinerFunction, - mergeValueFunction, - mergeValueFunction, - defaultPartitioner, - false, - new KryoSerializer(new SparkConf())); + .combineByKey( + createCombinerFunction, + mergeValueFunction, + mergeValueFunction, + defaultPartitioner, + false, + new KryoSerializer(new SparkConf())); results = combinedRDD.collectAsMap(); assertEquals(expected, results); } @@ -1291,11 +1294,10 @@ public class JavaAPISuite implements Serializable { JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i, i % 2)); JavaPairRDD<Integer, Integer> rdd3 = rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1())); assertEquals(Arrays.asList( - new Tuple2<>(1, 1), - new Tuple2<>(0, 2), - new Tuple2<>(1, 3), - new Tuple2<>(0, 4)), rdd3.collect()); - + new Tuple2<>(1, 1), + new Tuple2<>(0, 2), + new Tuple2<>(1, 3), + new Tuple2<>(0, 4)), rdd3.collect()); } @SuppressWarnings("unchecked") @@ -1312,16 +1314,18 @@ public class JavaAPISuite implements Serializable { assertEquals(Arrays.asList(3, 4), parts[0]); assertEquals(Arrays.asList(5, 6, 7), parts[1]); - assertEquals(Arrays.asList(new Tuple2<>(1, 1), - new Tuple2<>(2, 0)), - rdd2.collectPartitions(new int[] {0})[0]); + assertEquals( + Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)), + rdd2.collectPartitions(new int[] {0})[0]); List<Tuple2<Integer,Integer>>[] parts2 = rdd2.collectPartitions(new int[] {1, 2}); assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]); - assertEquals(Arrays.asList(new Tuple2<>(5, 1), - new Tuple2<>(6, 0), - new Tuple2<>(7, 1)), - parts2[1]); + assertEquals( + Arrays.asList( + new Tuple2<>(5, 1), + new Tuple2<>(6, 0), + new Tuple2<>(7, 1)), + parts2[1]); } @Test @@ -1352,7 +1356,6 @@ public class JavaAPISuite implements Serializable { double error = Math.abs((resCount - count) / count); assertTrue(error < 0.1); } - } @Test @@ -1531,8 +1534,8 @@ public class JavaAPISuite implements Serializable { SparkConf conf = new SparkConf(); conf.registerKryoClasses(new Class<?>[]{ Class1.class, Class2.class }); assertEquals( - Class1.class.getName() + "," + Class2.class.getName(), - conf.get("spark.kryo.classesToRegister")); + Class1.class.getName() + "," + Class2.class.getName(), + conf.get("spark.kryo.classesToRegister")); } @Test diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java index 3f809eba7fffbc9cdf090291be7346635fe3de64..a0979aa2d24e4f44e1a7750db060c2a27c946aec 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java @@ -27,7 +27,6 @@ import scala.collection.mutable.WrappedArray; import org.apache.spark.ml.feature.RegexTokenizer; import org.apache.spark.ml.feature.Tokenizer; -import org.apache.spark.sql.api.java.UDF1; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; @@ -69,7 +68,8 @@ public class JavaTokenizerExample { .setOutputCol("words") .setPattern("\\W"); // alternatively .setPattern("\\w+").setGaps(false); - spark.udf().register("countTokens", (WrappedArray<?> words) -> words.size(), DataTypes.IntegerType); + spark.udf().register( + "countTokens", (WrappedArray<?> words) -> words.size(), DataTypes.IntegerType); Dataset<Row> tokenized = tokenizer.transform(sentenceDataFrame); tokenized.select("sentence", "words") diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java index bd49f059b29fdc0dd96c137785493cac679544b6..dc9970d8852740cc94069ab8b6fc5dd495f0aa88 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java @@ -118,7 +118,9 @@ public class JavaRankingMetricsExample { new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating()))); JavaRDD<Tuple2<Object, Object>> ratesAndPreds = JavaPairRDD.fromJavaRDD(ratings.map(r -> - new Tuple2<Tuple2<Integer, Integer>, Object>(new Tuple2<>(r.user(), r.product()), r.rating()) + new Tuple2<Tuple2<Integer, Integer>, Object>( + new Tuple2<>(r.user(), r.product()), + r.rating()) )).join(predictions).values(); // Create regression metrics object diff --git a/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java index d1274a687fc701eb63e4e4e30b00e76a5a239cc8..626bde48e1a8694fdaa12253fc17ee849ff53deb 100644 --- a/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java +++ b/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java @@ -23,7 +23,6 @@ import java.util.Iterator; import java.util.List; import java.util.regex.Pattern; -import com.amazonaws.regions.RegionUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; diff --git a/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java b/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java index 26b1fda2ff5114cd91cc4978175252d308557ed0..b37b087467926a9af70c075e051f2c3c9c0d8a35 100644 --- a/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java +++ b/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java @@ -17,7 +17,6 @@ package org.apache.spark.streaming.kinesis; -import com.amazonaws.regions.RegionUtils; import com.amazonaws.services.kinesis.model.Record; import org.junit.Test; @@ -91,7 +90,8 @@ public class JavaKinesisStreamSuite extends LocalJavaStreamingContext { JavaDStream<String> kinesisStream = KinesisUtils.createStream(ssc, "testApp", "mySparkStream", "https://kinesis.us-west-2.amazonaws.com", "us-west-2", InitialPositionInStream.LATEST, new Duration(2000), StorageLevel.MEMORY_AND_DISK_2(), handler, String.class, - "fakeAccessKey", "fakeSecretKey", "fakeSTSRoleArn", "fakeSTSSessionName", "fakeSTSExternalId"); + "fakeAccessKey", "fakeSecretKey", "fakeSTSRoleArn", "fakeSTSSessionName", + "fakeSTSExternalId"); ssc.stop(); } diff --git a/mllib/src/test/java/org/apache/spark/mllib/tree/JavaDecisionTreeSuite.java b/mllib/src/test/java/org/apache/spark/mllib/tree/JavaDecisionTreeSuite.java index 0f71deb9ea5287f826f55994c749d5e3147fc461..d2fe6bb2ca7184f6e864c36d7dd54e76941a754a 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/tree/JavaDecisionTreeSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/tree/JavaDecisionTreeSuite.java @@ -33,7 +33,8 @@ import org.apache.spark.mllib.tree.model.DecisionTreeModel; public class JavaDecisionTreeSuite extends SharedSparkSession { - private static int validatePrediction(List<LabeledPoint> validationData, DecisionTreeModel model) { + private static int validatePrediction( + List<LabeledPoint> validationData, DecisionTreeModel model) { int numCorrect = 0; for (LabeledPoint point : validationData) { Double prediction = model.predict(point.features()); diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java index afea4676893edb66db016091f8b31d41425137af..791e8d80e6cba872859b7fc2b079baf95f4212f0 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java @@ -117,7 +117,7 @@ public class UnsafeArrayWriter { public void setNullInt(int ordinal) { setNullBit(ordinal); // put zero into the corresponding field when set null - Platform.putInt(holder.buffer, getElementOffset(ordinal, 4), (int)0); + Platform.putInt(holder.buffer, getElementOffset(ordinal, 4), 0); } public void setNullLong(int ordinal) { diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java index d3769a74b97892f0203b02f2c5434e87df496a22..539976d5af4698d2d485971ffd0db4ec45d7b08f 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java @@ -88,7 +88,7 @@ public class JavaDatasetAggregatorSuite extends JavaDatasetAggregatorSuiteBase { @Test public void testTypedAggregationAverage() { KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset(); - Dataset<Tuple2<String, Double>> agged = grouped.agg(typed.avg(value -> (double)(value._2() * 2))); + Dataset<Tuple2<String, Double>> agged = grouped.agg(typed.avg(value -> value._2() * 2.0)); Assert.assertEquals( Arrays.asList(new Tuple2<>("a", 3.0), new Tuple2<>("b", 6.0)), agged.collectAsList()); diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java index 4581c6ebe9ef8789fde5c6b640a435b5fe7e98ee..e3b0e37ccab0511000f26ec7445fe8c1e4fe4996 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java @@ -110,7 +110,8 @@ public class JavaDatasetSuite implements Serializable { Assert.assertEquals(Arrays.asList("hello"), filtered.collectAsList()); - Dataset<Integer> mapped = ds.map((MapFunction<String, Integer>) v -> v.length(), Encoders.INT()); + Dataset<Integer> mapped = + ds.map((MapFunction<String, Integer>) String::length, Encoders.INT()); Assert.assertEquals(Arrays.asList(5, 5), mapped.collectAsList()); Dataset<String> parMapped = ds.mapPartitions((MapPartitionsFunction<String, String>) it -> { @@ -157,17 +158,17 @@ public class JavaDatasetSuite implements Serializable { public void testGroupBy() { List<String> data = Arrays.asList("a", "foo", "bar"); Dataset<String> ds = spark.createDataset(data, Encoders.STRING()); - KeyValueGroupedDataset<Integer, String> grouped = ds.groupByKey( - (MapFunction<String, Integer>) v -> v.length(), - Encoders.INT()); + KeyValueGroupedDataset<Integer, String> grouped = + ds.groupByKey((MapFunction<String, Integer>) String::length, Encoders.INT()); - Dataset<String> mapped = grouped.mapGroups((MapGroupsFunction<Integer, String, String>) (key, values) -> { - StringBuilder sb = new StringBuilder(key.toString()); - while (values.hasNext()) { - sb.append(values.next()); - } - return sb.toString(); - }, Encoders.STRING()); + Dataset<String> mapped = grouped.mapGroups( + (MapGroupsFunction<Integer, String, String>) (key, values) -> { + StringBuilder sb = new StringBuilder(key.toString()); + while (values.hasNext()) { + sb.append(values.next()); + } + return sb.toString(); + }, Encoders.STRING()); Assert.assertEquals(asSet("1a", "3foobar"), toSet(mapped.collectAsList())); @@ -209,7 +210,8 @@ public class JavaDatasetSuite implements Serializable { Assert.assertEquals(asSet("1a", "3foobar"), toSet(flatMapped2.collectAsList())); - Dataset<Tuple2<Integer, String>> reduced = grouped.reduceGroups((ReduceFunction<String>) (v1, v2) -> v1 + v2); + Dataset<Tuple2<Integer, String>> reduced = + grouped.reduceGroups((ReduceFunction<String>) (v1, v2) -> v1 + v2); Assert.assertEquals( asSet(tuple2(1, "a"), tuple2(3, "foobar")), diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawList.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawList.java index 6adb1657bf25b794304b06b225e1de2cc46e0548..8211cbf16f7bf4731a062776fad3fddf892725b8 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawList.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawList.java @@ -25,6 +25,7 @@ import java.util.List; * UDF that returns a raw (non-parameterized) java List. */ public class UDFRawList extends UDF { + @SuppressWarnings("rawtypes") public List evaluate(Object o) { return Collections.singletonList("data1"); } diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawMap.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawMap.java index 4731b6eee85cd155a224928747d8986360b3a4f4..58c81f9945d7e4deb2eb185c402db5f3e8000663 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawMap.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawMap.java @@ -25,6 +25,7 @@ import java.util.Map; * UDF that returns a raw (non-parameterized) java Map. */ public class UDFRawMap extends UDF { + @SuppressWarnings("rawtypes") public Map evaluate(Object o) { return Collections.singletonMap("a", "1"); } diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java index cb8ed83e5a49dc40e833017a60ee3fa95993f8fb..b1367b8f2aed20a26871c99e9a878bd2fdfca69a 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java @@ -145,8 +145,8 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements List<Set<Tuple2<K, S>>> expectedStateSnapshots) { int numBatches = expectedOutputs.size(); JavaDStream<K> inputStream = JavaTestUtils.attachTestInputStream(ssc, input, 2); - JavaMapWithStateDStream<K, Integer, S, T> mapWithStateDStream = - JavaPairDStream.fromJavaDStream(inputStream.map(x -> new Tuple2<>(x, 1))).mapWithState(mapWithStateSpec); + JavaMapWithStateDStream<K, Integer, S, T> mapWithStateDStream = JavaPairDStream.fromJavaDStream( + inputStream.map(x -> new Tuple2<>(x, 1))).mapWithState(mapWithStateSpec); List<Set<T>> collectedOutputs = Collections.synchronizedList(new ArrayList<Set<T>>()); diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java index 9948a4074cdc700ab7bd129f274926ef63637843..80513de4ee11785095aa1e991d9dfb578fa99618 100644 --- a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java +++ b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java @@ -20,10 +20,13 @@ package test.org.apache.spark.streaming; import java.io.Serializable; import java.util.*; +import org.apache.spark.api.java.function.Function3; +import org.apache.spark.api.java.function.Function4; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.JavaTestUtils; import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.State; import org.apache.spark.streaming.StateSpec; import org.apache.spark.streaming.Time; import scala.Tuple2; @@ -142,8 +145,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ Arrays.asList(24)); JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> reducedWindowed = stream.reduceByWindow((x, y) -> x + y, - (x, y) -> x - y, new Duration(2000), new Duration(1000)); + JavaDStream<Integer> reducedWindowed = stream.reduceByWindow( + (x, y) -> x + y, (x, y) -> x - y, new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(reducedWindowed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4); @@ -850,36 +853,44 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaPairRDD<String, Boolean> initialRDD = null; JavaPairDStream<String, Integer> wordsDstream = null; + Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>> mapFn = + (time, key, value, state) -> { + // Use all State's methods here + state.exists(); + state.get(); + state.isTimingOut(); + state.remove(); + state.update(true); + return Optional.of(2.0); + }; + JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream = - wordsDstream.mapWithState( - StateSpec.<String, Integer, Boolean, Double>function((time, key, value, state) -> { - // Use all State's methods here - state.exists(); - state.get(); - state.isTimingOut(); - state.remove(); - state.update(true); - return Optional.of(2.0); - }).initialState(initialRDD) - .numPartitions(10) - .partitioner(new HashPartitioner(10)) - .timeout(Durations.seconds(10))); + wordsDstream.mapWithState( + StateSpec.function(mapFn) + .initialState(initialRDD) + .numPartitions(10) + .partitioner(new HashPartitioner(10)) + .timeout(Durations.seconds(10))); JavaPairDStream<String, Boolean> emittedRecords = stateDstream.stateSnapshots(); + Function3<String, Optional<Integer>, State<Boolean>, Double> mapFn2 = + (key, value, state) -> { + state.exists(); + state.get(); + state.isTimingOut(); + state.remove(); + state.update(true); + return 2.0; + }; + JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 = - wordsDstream.mapWithState( - StateSpec.<String, Integer, Boolean, Double>function((key, value, state) -> { - state.exists(); - state.get(); - state.isTimingOut(); - state.remove(); - state.update(true); - return 2.0; - }).initialState(initialRDD) - .numPartitions(10) - .partitioner(new HashPartitioner(10)) - .timeout(Durations.seconds(10))); + wordsDstream.mapWithState( + StateSpec.function(mapFn2) + .initialState(initialRDD) + .numPartitions(10) + .partitioner(new HashPartitioner(10)) + .timeout(Durations.seconds(10))); JavaPairDStream<String, Boolean> mappedDStream = stateDstream2.stateSnapshots(); } diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java index b966cbdca076d7ade49beefe4e328d93dd52e6c4..96f8d9593d6309672f02d9bab64615802b7ca227 100644 --- a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java @@ -29,7 +29,6 @@ import org.apache.spark.streaming.LocalJavaStreamingContext; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.StreamingContextState; import org.apache.spark.streaming.StreamingContextSuite; -import org.apache.spark.streaming.Time; import scala.Tuple2; import org.apache.hadoop.conf.Configuration; @@ -608,7 +607,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList("a","t","h","l","e","t","i","c","s")); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<String> flatMapped = stream.flatMap(x -> Arrays.asList(x.split("(?!^)")).iterator()); + JavaDStream<String> flatMapped = + stream.flatMap(x -> Arrays.asList(x.split("(?!^)")).iterator()); JavaTestUtils.attachTestOutputStream(flatMapped); List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1314,7 +1314,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ssc, inputData, 1); JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, String> mapped = pairStream.mapValues(s -> s.toUpperCase(Locale.ENGLISH)); + JavaPairDStream<String, String> mapped = + pairStream.mapValues(s -> s.toUpperCase(Locale.ENGLISH)); JavaTestUtils.attachTestOutputStream(mapped); List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);