diff --git a/core/pom.xml b/core/pom.xml index 1feb00b3a7fb86da2d670edfe345ab1724296ee7..c5c41b2b5de425d342d4fe27880d0bca3f8bcfd1 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -352,9 +352,9 @@ </execution> </executions> <configuration> - <tasks> + <target> <unzip src="../python/lib/py4j-0.8.2.1-src.zip" dest="../python/build" /> - </tasks> + </target> </configuration> </plugin> <plugin> diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index 819b51e12ad8c954663e11f68efc4bdfd759bdbc..4896ec845bbc9b6e848acab41d2b4fd3174f428d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -19,6 +19,7 @@ package org.apache.spark.scheduler import java.nio.ByteBuffer +import scala.language.existentials import scala.util.control.NonFatal import org.apache.spark._ diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index e5bdad6bda2faa8bf2d86f091809dcaec7065b9b..5ce299d05824b7187b80c8b8ade42f8a533370c1 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -184,6 +184,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2)); } + @SuppressWarnings("unchecked") @Test public void repartitionAndSortWithinPartitions() { List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>(); @@ -491,6 +492,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(33, sum); } + @SuppressWarnings("unchecked") @Test public void aggregateByKey() { JavaPairRDD<Integer, Integer> pairs = sc.parallelizePairs( @@ -1556,7 +1558,7 @@ public class JavaAPISuite implements Serializable { @Test public void testRegisterKryoClasses() { SparkConf conf = new SparkConf(); - conf.registerKryoClasses(new Class[]{ Class1.class, Class2.class }); + conf.registerKryoClasses(new Class<?>[]{ Class1.class, Class2.class }); Assert.assertEquals( Class1.class.getName() + "," + Class2.class.getName(), conf.get("spark.kryo.classesToRegister")); diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index ca226fd4e694f3889062bd579facb2f081b2afa9..f8bcde12a371a73340fea8582f2f0feff0ebf045 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -24,14 +24,14 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.{SparkListenerTaskEnd, SparkListener} import org.scalatest.FunSuite -import org.scalatest.matchers.ShouldMatchers +import org.scalatest.Matchers import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, FileSystem} import scala.collection.mutable.ArrayBuffer -class InputOutputMetricsSuite extends FunSuite with SharedSparkContext with ShouldMatchers { +class InputOutputMetricsSuite extends FunSuite with SharedSparkContext with Matchers { test("input metrics when reading text file with single split") { val file = new File(getClass.getSimpleName + ".txt") val pw = new PrintWriter(new FileWriter(file)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 436eea4f1fdcf5650fed0370f5271973e8e36063..d6ec9e129ccebcaad70147fdd31db4f7dcb5197a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -739,7 +739,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F test("accumulator not calculated for resubmitted result stage") { //just for register - val accum = new Accumulator[Int](0, SparkContext.IntAccumulatorParam) + val accum = new Accumulator[Int](0, AccumulatorParam.IntAccumulatorParam) val finalRdd = new MyRDD(sc, 1, Nil) submit(finalRdd, Array(0)) completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42))) diff --git a/mllib/src/test/java/org/apache/spark/mllib/feature/JavaTfIdfSuite.java b/mllib/src/test/java/org/apache/spark/mllib/feature/JavaTfIdfSuite.java index 064263e02cd11cf2b418a494585af7428c426a63..fbc26167ce66f15d261d6fde5f62f4bd08eb0e60 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/feature/JavaTfIdfSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/feature/JavaTfIdfSuite.java @@ -49,6 +49,7 @@ public class JavaTfIdfSuite implements Serializable { public void tfIdf() { // The tests are to check Java compatibility. HashingTF tf = new HashingTF(); + @SuppressWarnings("unchecked") JavaRDD<ArrayList<String>> documents = sc.parallelize(Lists.newArrayList( Lists.newArrayList("this is a sentence".split(" ")), Lists.newArrayList("this is another sentence".split(" ")), @@ -68,6 +69,7 @@ public class JavaTfIdfSuite implements Serializable { public void tfIdfMinimumDocumentFrequency() { // The tests are to check Java compatibility. HashingTF tf = new HashingTF(); + @SuppressWarnings("unchecked") JavaRDD<ArrayList<String>> documents = sc.parallelize(Lists.newArrayList( Lists.newArrayList("this is a sentence".split(" ")), Lists.newArrayList("this is another sentence".split(" ")), diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UserDefinedType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UserDefinedType.java index b751847b464fd693f73461822f3120bff49a13a1..f0d079d25b5d4a0f7317875f89d4e2e371c8db65 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/api/java/UserDefinedType.java +++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UserDefinedType.java @@ -35,6 +35,7 @@ public abstract class UserDefinedType<UserType> extends DataType implements Seri public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; + @SuppressWarnings("unchecked") UserDefinedType<UserType> that = (UserDefinedType<UserType>) o; return this.sqlType().equals(that.sqlType()); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index 0e6fb57d57bcaef2ba05e7bc05ae848e733f4153..97447871a11ee55b7f6b7184bb6927b2cef6a2ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -24,8 +24,8 @@ import scala.util.Try import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapreduce.Job -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter +import parquet.format.converter.ParquetMetadataConverter import parquet.hadoop.{ParquetFileReader, Footer, ParquetFileWriter} import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData} import parquet.hadoop.util.ContextUtil @@ -458,7 +458,7 @@ private[parquet] object ParquetTypesConverter extends Logging { // ... and fallback to "_metadata" if no such file exists (which implies the Parquet file is // empty, thus normally the "_metadata" file is expected to be fairly small). .orElse(children.find(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)) - .map(ParquetFileReader.readFooter(conf, _)) + .map(ParquetFileReader.readFooter(conf, _, ParquetMetadataConverter.NO_FILTER)) .getOrElse( throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path")) } diff --git a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaRowSuite.java b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaRowSuite.java index bc5cd66482add6cb6f38950a3834880fc127eea8..2b5812159d07dd39bd170a658081d1999c068677 100644 --- a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaRowSuite.java +++ b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaRowSuite.java @@ -141,6 +141,7 @@ public class JavaRowSuite { doubleValue, stringValue, timestampValue, null); // Complex array + @SuppressWarnings("unchecked") List<Map<String, Long>> arrayOfMaps = Arrays.asList(simpleMap); List<Row> arrayOfRows = Arrays.asList(simpleStruct); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala index e40d034ce4dc002a97bd1ec577e2eae08f833970..691c4b38287bf3db6f98c0f374d1adf23abde54b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala @@ -24,6 +24,8 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.dsl._ import org.apache.spark.sql.test.TestSQLContext._ +import scala.language.postfixOps + class DslQuerySuite extends QueryTest { import org.apache.spark.sql.TestData._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 074855389d74665348378272405d5664ec7039f7..a5fe2e8da28400b7865b2fb0dd5c62e3ae6580d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.parquet +import scala.reflect.ClassTag + import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapreduce.Job import org.scalatest.{BeforeAndAfterAll, FunSuiteLike} @@ -459,11 +461,17 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA } test("make RecordFilter for simple predicates") { - def checkFilter[T <: FilterPredicate](predicate: Expression, defined: Boolean = true): Unit = { + def checkFilter[T <: FilterPredicate : ClassTag]( + predicate: Expression, + defined: Boolean = true): Unit = { val filter = ParquetFilters.createFilter(predicate) if (defined) { assert(filter.isDefined) - assert(filter.get.isInstanceOf[T]) + val tClass = implicitly[ClassTag[T]].runtimeClass + val filterGet = filter.get + assert( + tClass.isInstance(filterGet), + s"$filterGet of type ${filterGet.getClass} is not an instance of $tClass") } else { assert(filter.isEmpty) } @@ -484,7 +492,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA checkFilter[Operators.And]('a.int === 1 && 'a.int < 4) checkFilter[Operators.Or]('a.int === 1 || 'a.int < 4) - checkFilter[Operators.Not](!('a.int === 1)) + checkFilter[Operators.NotEq[Integer]](!('a.int === 1)) checkFilter('a.int > 'b.int, defined = false) checkFilter(('a.int > 'b.int) && ('a.int > 'b.int), defined = false) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala index abed299cd957fa8f84ea775aef2c6eb2ba556c60..2a16c9d1a27c94c70258cfb632f2334565fb883e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala @@ -32,7 +32,7 @@ import org.apache.hadoop.io.Writable * when "spark.sql.hive.convertMetastoreParquet" is set to true. */ @deprecated("No code should depend on FakeParquetHiveSerDe as it is only intended as a " + - "placeholder in the Hive MetaStore") + "placeholder in the Hive MetaStore", "1.2.0") class FakeParquetSerDe extends SerDe { override def getObjectInspector: ObjectInspector = new ObjectInspector { override def getCategory: Category = Category.PRIMITIVE diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFListListInt.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFListListInt.java index d2d39a8c4dc28d641d21a3a70ac04547b2280ec4..808e2986d3b77c4f2ead26cb492fb36344ea7b6d 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFListListInt.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFListListInt.java @@ -23,25 +23,21 @@ import java.util.List; public class UDFListListInt extends UDF { /** - * * @param obj - * SQL schema: array<struct<x: int, y: int, z: int>> - * Java Type: List<List<Integer>> - * @return + * SQL schema: array<struct<x: int, y: int, z: int>> + * Java Type: List<List<Integer>> */ + @SuppressWarnings("unchecked") public long evaluate(Object obj) { if (obj == null) { - return 0l; + return 0L; } - List<List> listList = (List<List>) obj; + List<List<?>> listList = (List<List<?>>) obj; long retVal = 0; - for (List aList : listList) { - @SuppressWarnings("unchecked") - List<Object> list = (List<Object>) aList; - @SuppressWarnings("unchecked") - Integer someInt = (Integer) list.get(1); + for (List<?> aList : listList) { + Number someInt = (Number) aList.get(1); try { - retVal += (long) (someInt.intValue()); + retVal += someInt.longValue(); } catch (NullPointerException e) { System.out.println(e); } diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index ce645fccba1d0b1346f10c0e98a0803478096cb6..12cc0de7509d6c3dc7059209a9ecf86348cc21a0 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -57,7 +57,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @Test public void testInitialization() { - Assert.assertNotNull(ssc.sc()); + Assert.assertNotNull(ssc.sparkContext()); } @SuppressWarnings("unchecked") @@ -662,7 +662,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa listOfDStreams1, new Function2<List<JavaRDD<?>>, Time, JavaRDD<Long>>() { public JavaRDD<Long> call(List<JavaRDD<?>> listOfRDDs, Time time) { - assert(listOfRDDs.size() == 2); + Assert.assertEquals(2, listOfRDDs.size()); return null; } } @@ -675,7 +675,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa listOfDStreams2, new Function2<List<JavaRDD<?>>, Time, JavaPairRDD<Integer, Tuple2<Integer, String>>>() { public JavaPairRDD<Integer, Tuple2<Integer, String>> call(List<JavaRDD<?>> listOfRDDs, Time time) { - assert(listOfRDDs.size() == 3); + Assert.assertEquals(3, listOfRDDs.size()); JavaRDD<Integer> rdd1 = (JavaRDD<Integer>)listOfRDDs.get(0); JavaRDD<Integer> rdd2 = (JavaRDD<Integer>)listOfRDDs.get(1); JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>)listOfRDDs.get(2); @@ -969,7 +969,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa }); JavaTestUtils.attachTestOutputStream(reversed); - List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -1012,7 +1012,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } }); JavaTestUtils.attachTestOutputStream(flatMapped); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -1163,9 +1163,9 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaTestUtils.attachTestOutputStream(groupWindowed); List<List<Tuple2<String, List<Integer>>>> result = JavaTestUtils.runStreams(ssc, 3, 3); - assert(result.size() == expected.size()); + Assert.assertEquals(expected.size(), result.size()); for (int i = 0; i < result.size(); i++) { - assert(convert(result.get(i)).equals(convert(expected.get(i)))); + Assert.assertEquals(convert(expected.get(i)), convert(result.get(i))); } } @@ -1383,7 +1383,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa }); JavaTestUtils.attachTestOutputStream(sorted); - List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List<List<Tuple2<Integer, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); }