diff --git a/.travis.yml b/.travis.yml index 8739849a20798dc76432acd62f8d12a86d284b29..d94872db6437a76f3ba8699f4e9f9299453a636a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -44,7 +44,7 @@ notifications: # 5. Run maven install before running lint-java. install: - export MAVEN_SKIP_RC=1 - - build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive -Phive-thriftserver install + - build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install # 6. Run lint-java. script: diff --git a/appveyor.yml b/appveyor.yml index 5e756835bcb9b1e6634fbad24d7fba2a5012da2a..6bc66c0ea54dca4eb9f2198888cc75e0a7ded10e 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -43,7 +43,7 @@ install: - cmd: R -e "packageVersion('survival')" build_script: - - cmd: mvn -DskipTests -Phadoop-2.6 -Psparkr -Phive -Phive-thriftserver package + - cmd: mvn -DskipTests -Psparkr -Phive -Phive-thriftserver package test_script: - cmd: .\bin\spark-submit2.cmd --conf spark.hadoop.fs.default.name="file:///" R\pkg\tests\run-all.R diff --git a/assembly/README b/assembly/README index 14a5ff8dfc78fc1ea19b86c294320fa657e8c482..d5dafab477410a30ecc6611ac291dc63d4507b59 100644 --- a/assembly/README +++ b/assembly/README @@ -9,4 +9,4 @@ This module is off by default. To activate it specify the profile in the command If you need to build an assembly for a different version of Hadoop the hadoop-version system property needs to be set as in this example: - -Dhadoop.version=2.0.6-alpha + -Dhadoop.version=2.7.3 diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 23156072c3ebe7a6fdc4f73e03a1da61eee03b2b..941e2d13fb28eda9a6919996c0d961fd3fddb87a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -18,7 +18,6 @@ package org.apache.spark.deploy import java.io.IOException -import java.lang.reflect.Method import java.security.PrivilegedExceptionAction import java.text.DateFormat import java.util.{Arrays, Comparator, Date, Locale} @@ -29,7 +28,6 @@ import scala.util.control.NonFatal import com.google.common.primitives.Longs import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} -import org.apache.hadoop.fs.FileSystem.Statistics import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.security.token.{Token, TokenIdentifier} @@ -140,54 +138,29 @@ class SparkHadoopUtil extends Logging { /** * Returns a function that can be called to find Hadoop FileSystem bytes read. If * getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will - * return the bytes read on r since t. Reflection is required because thread-level FileSystem - * statistics are only available as of Hadoop 2.5 (see HADOOP-10688). - * Returns None if the required method can't be found. + * return the bytes read on r since t. + * + * @return None if the required method can't be found. */ - private[spark] def getFSBytesReadOnThreadCallback(): Option[() => Long] = { - try { - val threadStats = getFileSystemThreadStatistics() - val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead") - val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum - val baselineBytesRead = f() - Some(() => f() - baselineBytesRead) - } catch { - case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => - logDebug("Couldn't find method for retrieving thread-level FileSystem input data", e) - None - } + private[spark] def getFSBytesReadOnThreadCallback(): () => Long = { + val threadStats = FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics) + val f = () => threadStats.map(_.getBytesRead).sum + val baselineBytesRead = f() + () => f() - baselineBytesRead } /** * Returns a function that can be called to find Hadoop FileSystem bytes written. If * getFSBytesWrittenOnThreadCallback is called from thread r at time t, the returned callback will - * return the bytes written on r since t. Reflection is required because thread-level FileSystem - * statistics are only available as of Hadoop 2.5 (see HADOOP-10688). - * Returns None if the required method can't be found. + * return the bytes written on r since t. + * + * @return None if the required method can't be found. */ - private[spark] def getFSBytesWrittenOnThreadCallback(): Option[() => Long] = { - try { - val threadStats = getFileSystemThreadStatistics() - val getBytesWrittenMethod = getFileSystemThreadStatisticsMethod("getBytesWritten") - val f = () => threadStats.map(getBytesWrittenMethod.invoke(_).asInstanceOf[Long]).sum - val baselineBytesWritten = f() - Some(() => f() - baselineBytesWritten) - } catch { - case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => - logDebug("Couldn't find method for retrieving thread-level FileSystem output data", e) - None - } - } - - private def getFileSystemThreadStatistics(): Seq[AnyRef] = { - FileSystem.getAllStatistics.asScala.map( - Utils.invoke(classOf[Statistics], _, "getThreadStatistics")) - } - - private def getFileSystemThreadStatisticsMethod(methodName: String): Method = { - val statisticsDataClass = - Utils.classForName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData") - statisticsDataClass.getDeclaredMethod(methodName) + private[spark] def getFSBytesWrittenOnThreadCallback(): () => Long = { + val threadStats = FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics) + val f = () => threadStats.map(_.getBytesWritten).sum + val baselineBytesWritten = f() + () => f() - baselineBytesWritten } /** diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala index 63918ef12a988e505a662ed6361601d80491a42f..1e0a1e605cfbb088b624d6595089829eecfa1d68 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala @@ -125,8 +125,7 @@ object SparkHadoopMapReduceWriter extends Logging { val taskContext = new TaskAttemptContextImpl(hadoopConf, attemptId) committer.setupTask(taskContext) - val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] = - SparkHadoopWriterUtils.initHadoopOutputMetrics(context) + val (outputMetrics, callback) = SparkHadoopWriterUtils.initHadoopOutputMetrics(context) // Initiate the writer. val taskFormat = outputFormat.newInstance() @@ -149,8 +148,7 @@ object SparkHadoopMapReduceWriter extends Logging { writer.write(pair._1, pair._2) // Update bytes written metric every few records - SparkHadoopWriterUtils.maybeUpdateOutputMetrics( - outputMetricsAndBytesWrittenCallback, recordsWritten) + SparkHadoopWriterUtils.maybeUpdateOutputMetrics(outputMetrics, callback, recordsWritten) recordsWritten += 1 } if (writer != null) { @@ -171,11 +169,8 @@ object SparkHadoopMapReduceWriter extends Logging { } }) - outputMetricsAndBytesWrittenCallback.foreach { - case (om, callback) => - om.setBytesWritten(callback()) - om.setRecordsWritten(recordsWritten) - } + outputMetrics.setBytesWritten(callback()) + outputMetrics.setRecordsWritten(recordsWritten) ret } catch { @@ -222,24 +217,18 @@ object SparkHadoopWriterUtils { // TODO: these don't seem like the right abstractions. // We should abstract the duplicate code in a less awkward way. - // return type: (output metrics, bytes written callback), defined only if the latter is defined - def initHadoopOutputMetrics( - context: TaskContext): Option[(OutputMetrics, () => Long)] = { + def initHadoopOutputMetrics(context: TaskContext): (OutputMetrics, () => Long) = { val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback() - bytesWrittenCallback.map { b => - (context.taskMetrics().outputMetrics, b) - } + (context.taskMetrics().outputMetrics, bytesWrittenCallback) } def maybeUpdateOutputMetrics( - outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)], + outputMetrics: OutputMetrics, + callback: () => Long, recordsWritten: Long): Unit = { if (recordsWritten % RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) { - outputMetricsAndBytesWrittenCallback.foreach { - case (om, callback) => - om.setBytesWritten(callback()) - om.setRecordsWritten(recordsWritten) - } + outputMetrics.setBytesWritten(callback()) + outputMetrics.setRecordsWritten(recordsWritten) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index a83e139c13eef273d78846e0118089581433340c..5fa6a7ed315f48b58357b02fe6458fa6794c4091 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -25,15 +25,7 @@ import scala.collection.immutable.Map import scala.reflect.ClassTag import org.apache.hadoop.conf.{Configurable, Configuration} -import org.apache.hadoop.mapred.FileSplit -import org.apache.hadoop.mapred.InputFormat -import org.apache.hadoop.mapred.InputSplit -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapred.JobID -import org.apache.hadoop.mapred.RecordReader -import org.apache.hadoop.mapred.Reporter -import org.apache.hadoop.mapred.TaskAttemptID -import org.apache.hadoop.mapred.TaskID +import org.apache.hadoop.mapred._ import org.apache.hadoop.mapred.lib.CombineFileSplit import org.apache.hadoop.mapreduce.TaskType import org.apache.hadoop.util.ReflectionUtils @@ -47,7 +39,7 @@ import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation} import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager, Utils} +import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager} /** * A Spark split class that wraps around a Hadoop InputSplit. @@ -229,11 +221,11 @@ class HadoopRDD[K, V]( // creating RecordReader, because RecordReader's constructor might read some bytes private val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match { case _: FileSplit | _: CombineFileSplit => - SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() + Some(SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()) case _ => None } - // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics. + // We get our input bytes from thread-local Hadoop FileSystem statistics. // If we do a coalesce, however, we are likely to compute multiple partitions in the same // task and in the same thread, in which case we need to avoid override values written by // previous partitions (SPARK-13071). @@ -280,13 +272,9 @@ class HadoopRDD[K, V]( (key, value) } - override def close() { + override def close(): Unit = { if (reader != null) { InputFileBlockHolder.unset() - // Close the reader and release it. Note: it's very important that we don't close the - // reader more than once, since that exposes us to MAPREDUCE-5918 when running against - // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic - // corruption issues when reading compressed input. try { reader.close() } catch { @@ -326,18 +314,10 @@ class HadoopRDD[K, V]( override def getPreferredLocations(split: Partition): Seq[String] = { val hsplit = split.asInstanceOf[HadoopPartition].inputSplit.value - val locs: Option[Seq[String]] = HadoopRDD.SPLIT_INFO_REFLECTIONS match { - case Some(c) => - try { - val lsplit = c.inputSplitWithLocationInfo.cast(hsplit) - val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]] - HadoopRDD.convertSplitLocationInfo(infos) - } catch { - case e: Exception => - logDebug("Failed to use InputSplitWithLocations.", e) - None - } - case None => None + val locs = hsplit match { + case lsplit: InputSplitWithLocationInfo => + HadoopRDD.convertSplitLocationInfo(lsplit.getLocationInfo) + case _ => None } locs.getOrElse(hsplit.getLocations.filter(_ != "localhost")) } @@ -413,32 +393,12 @@ private[spark] object HadoopRDD extends Logging { } } - private[spark] class SplitInfoReflections { - val inputSplitWithLocationInfo = - Utils.classForName("org.apache.hadoop.mapred.InputSplitWithLocationInfo") - val getLocationInfo = inputSplitWithLocationInfo.getMethod("getLocationInfo") - val newInputSplit = Utils.classForName("org.apache.hadoop.mapreduce.InputSplit") - val newGetLocationInfo = newInputSplit.getMethod("getLocationInfo") - val splitLocationInfo = Utils.classForName("org.apache.hadoop.mapred.SplitLocationInfo") - val isInMemory = splitLocationInfo.getMethod("isInMemory") - val getLocation = splitLocationInfo.getMethod("getLocation") - } - - private[spark] val SPLIT_INFO_REFLECTIONS: Option[SplitInfoReflections] = try { - Some(new SplitInfoReflections) - } catch { - case e: Exception => - logDebug("SplitLocationInfo and other new Hadoop classes are " + - "unavailable. Using the older Hadoop location info code.", e) - None - } - - private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Option[Seq[String]] = { + private[spark] def convertSplitLocationInfo( + infos: Array[SplitLocationInfo]): Option[Seq[String]] = { Option(infos).map(_.flatMap { loc => - val reflections = HadoopRDD.SPLIT_INFO_REFLECTIONS.get - val locationStr = reflections.getLocation.invoke(loc).asInstanceOf[String] + val locationStr = loc.getLocation if (locationStr != "localhost") { - if (reflections.isInMemory.invoke(loc).asInstanceOf[Boolean]) { + if (loc.isInMemory) { logDebug(s"Partition $locationStr is cached by Hadoop.") Some(HDFSCacheTaskLocation(locationStr).toString) } else { diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 733e85f305f058b1479b47ab6223e8869e86df1c..ce3a9a2a1e2a801cd6653d4fe6cdc4d7adf4300f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -152,11 +152,11 @@ class NewHadoopRDD[K, V]( private val getBytesReadCallback: Option[() => Long] = split.serializableHadoopSplit.value match { case _: FileSplit | _: CombineFileSplit => - SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() + Some(SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()) case _ => None } - // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics. + // We get our input bytes from thread-local Hadoop FileSystem statistics. // If we do a coalesce, however, we are likely to compute multiple partitions in the same // task and in the same thread, in which case we need to avoid override values written by // previous partitions (SPARK-13071). @@ -231,13 +231,9 @@ class NewHadoopRDD[K, V]( (reader.getCurrentKey, reader.getCurrentValue) } - private def close() { + private def close(): Unit = { if (reader != null) { InputFileBlockHolder.unset() - // Close the reader and release it. Note: it's very important that we don't close the - // reader more than once, since that exposes us to MAPREDUCE-5918 when running against - // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic - // corruption issues when reading compressed input. try { reader.close() } catch { @@ -277,18 +273,7 @@ class NewHadoopRDD[K, V]( override def getPreferredLocations(hsplit: Partition): Seq[String] = { val split = hsplit.asInstanceOf[NewHadoopPartition].serializableHadoopSplit.value - val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match { - case Some(c) => - try { - val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]] - HadoopRDD.convertSplitLocationInfo(infos) - } catch { - case e : Exception => - logDebug("Failed to use InputSplit#getLocationInfo.", e) - None - } - case None => None - } + val locs = HadoopRDD.convertSplitLocationInfo(split.getLocationInfo) locs.getOrElse(split.getLocations.filter(_ != "localhost")) } diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 41093bdb858c0e98b6b3f8d1124d50e76ba782e0..567a3183e224ccb083505e05a755ba3982a86da1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -37,8 +37,7 @@ import org.apache.spark._ import org.apache.spark.Partitioner.defaultPartitioner import org.apache.spark.annotation.Experimental import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.executor.OutputMetrics -import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol, SparkHadoopMapReduceWriter, SparkHadoopWriterUtils} +import org.apache.spark.internal.io.{SparkHadoopMapReduceWriter, SparkHadoopWriterUtils} import org.apache.spark.internal.Logging import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.serializer.Serializer @@ -1126,8 +1125,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) // around by taking a mod. We expect that no task will be attempted 2 billion times. val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt - val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] = - SparkHadoopWriterUtils.initHadoopOutputMetrics(context) + val (outputMetrics, callback) = SparkHadoopWriterUtils.initHadoopOutputMetrics(context) writer.setup(context.stageId, context.partitionId, taskAttemptId) writer.open() @@ -1139,16 +1137,13 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) // Update bytes written metric every few records - SparkHadoopWriterUtils.maybeUpdateOutputMetrics( - outputMetricsAndBytesWrittenCallback, recordsWritten) + SparkHadoopWriterUtils.maybeUpdateOutputMetrics(outputMetrics, callback, recordsWritten) recordsWritten += 1 } }(finallyBlock = writer.close()) writer.commit() - outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) => - om.setBytesWritten(callback()) - om.setRecordsWritten(recordsWritten) - } + outputMetrics.setBytesWritten(callback()) + outputMetrics.setRecordsWritten(recordsWritten) } self.context.runJob(self, writeToFile) diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index a73b300ec2c21200d4ff54feba76ca2f248f435a..becf3829e7248f198407249f224ee17c4f51766e 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutput import org.scalatest.BeforeAndAfter import org.apache.spark.{SharedSparkContext, SparkFunSuite} -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.util.Utils @@ -186,10 +185,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext sc.listenerBus.waitUntilEmpty(500) assert(inputRead == numRecords) - // Only supported on newer Hadoop - if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) { - assert(outputWritten == numBuckets) - } + assert(outputWritten == numBuckets) assert(shuffleRead == shuffleWritten) } @@ -262,57 +258,49 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext } test("output metrics on records written") { - // Only supported on newer Hadoop - if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) { - val file = new File(tmpDir, getClass.getSimpleName) - val filePath = "file://" + file.getAbsolutePath + val file = new File(tmpDir, getClass.getSimpleName) + val filePath = "file://" + file.getAbsolutePath - val records = runAndReturnRecordsWritten { - sc.parallelize(1 to numRecords).saveAsTextFile(filePath) - } - assert(records == numRecords) + val records = runAndReturnRecordsWritten { + sc.parallelize(1 to numRecords).saveAsTextFile(filePath) } + assert(records == numRecords) } test("output metrics on records written - new Hadoop API") { - // Only supported on newer Hadoop - if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) { - val file = new File(tmpDir, getClass.getSimpleName) - val filePath = "file://" + file.getAbsolutePath - - val records = runAndReturnRecordsWritten { - sc.parallelize(1 to numRecords).map(key => (key.toString, key.toString)) - .saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](filePath) - } - assert(records == numRecords) + val file = new File(tmpDir, getClass.getSimpleName) + val filePath = "file://" + file.getAbsolutePath + + val records = runAndReturnRecordsWritten { + sc.parallelize(1 to numRecords).map(key => (key.toString, key.toString)) + .saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](filePath) } + assert(records == numRecords) } test("output metrics when writing text file") { val fs = FileSystem.getLocal(new Configuration()) val outPath = new Path(fs.getWorkingDirectory, "outdir") - if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) { - val taskBytesWritten = new ArrayBuffer[Long]() - sc.addSparkListener(new SparkListener() { - override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { - taskBytesWritten += taskEnd.taskMetrics.outputMetrics.bytesWritten - } - }) - - val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2) - - try { - rdd.saveAsTextFile(outPath.toString) - sc.listenerBus.waitUntilEmpty(500) - assert(taskBytesWritten.length == 2) - val outFiles = fs.listStatus(outPath).filter(_.getPath.getName != "_SUCCESS") - taskBytesWritten.zip(outFiles).foreach { case (bytes, fileStatus) => - assert(bytes >= fileStatus.getLen) - } - } finally { - fs.delete(outPath, true) + val taskBytesWritten = new ArrayBuffer[Long]() + sc.addSparkListener(new SparkListener() { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + taskBytesWritten += taskEnd.taskMetrics.outputMetrics.bytesWritten + } + }) + + val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2) + + try { + rdd.saveAsTextFile(outPath.toString) + sc.listenerBus.waitUntilEmpty(500) + assert(taskBytesWritten.length == 2) + val outFiles = fs.listStatus(outPath).filter(_.getPath.getName != "_SUCCESS") + taskBytesWritten.zip(outFiles).foreach { case (bytes, fileStatus) => + assert(bytes >= fileStatus.getLen) } + } finally { + fs.delete(outPath, true) } } diff --git a/dev/appveyor-install-dependencies.ps1 b/dev/appveyor-install-dependencies.ps1 index b72d6b53bc08adeae61d81bd8bcf9a4173367521..5a729674c3cb36c9f8ae11cfa88960b27ed0f62f 100644 --- a/dev/appveyor-install-dependencies.ps1 +++ b/dev/appveyor-install-dependencies.ps1 @@ -95,7 +95,7 @@ $env:MAVEN_OPTS = "-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" Pop-Location # ========================== Hadoop bin package -$hadoopVer = "2.6.0" +$hadoopVer = "2.6.5" $hadoopPath = "$tools\hadoop" if (!(Test-Path $hadoopPath)) { New-Item -ItemType Directory -Force -Path $hadoopPath | Out-Null diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh index b08577c47c6732635a781ad5645b0e1112a7edb3..d616f80c541a966a63b44f267924969ad47632b1 100755 --- a/dev/create-release/release-build.sh +++ b/dev/create-release/release-build.sh @@ -80,7 +80,7 @@ NEXUS_PROFILE=d63f592e7eac0 # Profile for Spark staging uploads BASE_DIR=$(pwd) MVN="build/mvn --force" -PUBLISH_PROFILES="-Pmesos -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2" +PUBLISH_PROFILES="-Pmesos -Pyarn -Phive -Phive-thriftserver" PUBLISH_PROFILES="$PUBLISH_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl" rm -rf spark @@ -236,11 +236,8 @@ if [[ "$1" == "package" ]]; then # We increment the Zinc port each time to avoid OOM's and other craziness if multiple builds # share the same Zinc server. FLAGS="-Psparkr -Phive -Phive-thriftserver -Pyarn -Pmesos" - make_binary_release "hadoop2.3" "-Phadoop-2.3 $FLAGS" "3033" & - make_binary_release "hadoop2.4" "-Phadoop-2.4 $FLAGS" "3034" & make_binary_release "hadoop2.6" "-Phadoop-2.6 $FLAGS" "3035" "withr" & make_binary_release "hadoop2.7" "-Phadoop-2.7 $FLAGS" "3036" "withpip" & - make_binary_release "hadoop2.4-without-hive" "-Psparkr -Phadoop-2.4 -Pyarn -Pmesos" "3037" & make_binary_release "without-hadoop" "-Psparkr -Phadoop-provided -Pyarn -Pmesos" "3038" & wait rm -rf spark-$SPARK_VERSION-bin-*/ diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2 deleted file mode 100644 index 1254188f71680d435e082816bafd707f0f4cf1c1..0000000000000000000000000000000000000000 --- a/dev/deps/spark-deps-hadoop-2.2 +++ /dev/null @@ -1,166 +0,0 @@ -JavaEWAH-0.3.2.jar -RoaringBitmap-0.5.11.jar -ST4-4.0.4.jar -antlr-2.7.7.jar -antlr-runtime-3.4.jar -antlr4-runtime-4.5.3.jar -aopalliance-1.0.jar -aopalliance-repackaged-2.4.0-b34.jar -apache-log4j-extras-1.2.17.jar -arpack_combined_all-0.1.jar -avro-1.7.7.jar -avro-ipc-1.7.7.jar -avro-mapred-1.7.7-hadoop2.jar -bonecp-0.8.0.RELEASE.jar -breeze-macros_2.11-0.12.jar -breeze_2.11-0.12.jar -calcite-avatica-1.2.0-incubating.jar -calcite-core-1.2.0-incubating.jar -calcite-linq4j-1.2.0-incubating.jar -chill-java-0.8.0.jar -chill_2.11-0.8.0.jar -commons-beanutils-1.7.0.jar -commons-beanutils-core-1.8.0.jar -commons-cli-1.2.jar -commons-codec-1.10.jar -commons-collections-3.2.2.jar -commons-compiler-3.0.0.jar -commons-compress-1.4.1.jar -commons-configuration-1.6.jar -commons-crypto-1.0.0.jar -commons-dbcp-1.4.jar -commons-digester-1.8.jar -commons-httpclient-3.1.jar -commons-io-2.4.jar -commons-lang-2.6.jar -commons-lang3-3.5.jar -commons-logging-1.1.3.jar -commons-math-2.1.jar -commons-math3-3.4.1.jar -commons-net-2.2.jar -commons-pool-1.5.4.jar -compress-lzf-1.0.3.jar -core-1.1.2.jar -curator-client-2.4.0.jar -curator-framework-2.4.0.jar -curator-recipes-2.4.0.jar -datanucleus-api-jdo-3.2.6.jar -datanucleus-core-3.2.10.jar -datanucleus-rdbms-3.2.9.jar -derby-10.12.1.1.jar -eigenbase-properties-1.1.5.jar -guava-14.0.1.jar -guice-3.0.jar -guice-servlet-3.0.jar -hadoop-annotations-2.2.0.jar -hadoop-auth-2.2.0.jar -hadoop-client-2.2.0.jar -hadoop-common-2.2.0.jar -hadoop-hdfs-2.2.0.jar -hadoop-mapreduce-client-app-2.2.0.jar -hadoop-mapreduce-client-common-2.2.0.jar -hadoop-mapreduce-client-core-2.2.0.jar -hadoop-mapreduce-client-jobclient-2.2.0.jar -hadoop-mapreduce-client-shuffle-2.2.0.jar -hadoop-yarn-api-2.2.0.jar -hadoop-yarn-client-2.2.0.jar -hadoop-yarn-common-2.2.0.jar -hadoop-yarn-server-common-2.2.0.jar -hadoop-yarn-server-web-proxy-2.2.0.jar -hk2-api-2.4.0-b34.jar -hk2-locator-2.4.0-b34.jar -hk2-utils-2.4.0-b34.jar -httpclient-4.5.2.jar -httpcore-4.4.4.jar -ivy-2.4.0.jar -jackson-annotations-2.6.5.jar -jackson-core-2.6.5.jar -jackson-core-asl-1.9.13.jar -jackson-databind-2.6.5.jar -jackson-mapper-asl-1.9.13.jar -jackson-module-paranamer-2.6.5.jar -jackson-module-scala_2.11-2.6.5.jar -janino-3.0.0.jar -javassist-3.18.1-GA.jar -javax.annotation-api-1.2.jar -javax.inject-1.jar -javax.inject-2.4.0-b34.jar -javax.servlet-api-3.1.0.jar -javax.ws.rs-api-2.0.1.jar -javolution-5.5.1.jar -jcl-over-slf4j-1.7.16.jar -jdo-api-3.0.1.jar -jersey-client-2.22.2.jar -jersey-common-2.22.2.jar -jersey-container-servlet-2.22.2.jar -jersey-container-servlet-core-2.22.2.jar -jersey-guava-2.22.2.jar -jersey-media-jaxb-2.22.2.jar -jersey-server-2.22.2.jar -jets3t-0.7.1.jar -jetty-util-6.1.26.jar -jline-2.12.1.jar -joda-time-2.9.3.jar -jodd-core-3.5.2.jar -jpam-1.1.jar -json4s-ast_2.11-3.2.11.jar -json4s-core_2.11-3.2.11.jar -json4s-jackson_2.11-3.2.11.jar -jsr305-1.3.9.jar -jta-1.1.jar -jtransforms-2.4.0.jar -jul-to-slf4j-1.7.16.jar -kryo-shaded-3.0.3.jar -leveldbjni-all-1.8.jar -libfb303-0.9.3.jar -libthrift-0.9.3.jar -log4j-1.2.17.jar -lz4-1.3.0.jar -mesos-1.0.0-shaded-protobuf.jar -metrics-core-3.1.2.jar -metrics-graphite-3.1.2.jar -metrics-json-3.1.2.jar -metrics-jvm-3.1.2.jar -minlog-1.3.0.jar -netty-3.9.9.Final.jar -netty-all-4.0.43.Final.jar -objenesis-2.1.jar -opencsv-2.3.jar -oro-2.0.8.jar -osgi-resource-locator-1.0.1.jar -paranamer-2.6.jar -parquet-column-1.8.2.jar -parquet-common-1.8.2.jar -parquet-encoding-1.8.2.jar -parquet-format-2.3.1.jar -parquet-hadoop-1.8.2.jar -parquet-hadoop-bundle-1.6.0.jar -parquet-jackson-1.8.2.jar -pmml-model-1.2.15.jar -pmml-schema-1.2.15.jar -protobuf-java-2.5.0.jar -py4j-0.10.4.jar -pyrolite-4.13.jar -scala-compiler-2.11.8.jar -scala-library-2.11.8.jar -scala-parser-combinators_2.11-1.0.4.jar -scala-reflect-2.11.8.jar -scala-xml_2.11-1.0.2.jar -scalap-2.11.8.jar -shapeless_2.11-2.0.0.jar -slf4j-api-1.7.16.jar -slf4j-log4j12-1.7.16.jar -snappy-0.2.jar -snappy-java-1.1.2.6.jar -spire-macros_2.11-0.7.4.jar -spire_2.11-0.7.4.jar -stax-api-1.0.1.jar -stream-2.7.0.jar -stringtemplate-3.2.1.jar -super-csv-2.2.0.jar -univocity-parsers-2.2.1.jar -validation-api-1.1.0.Final.jar -xbean-asm5-shaded-4.4.jar -xmlenc-0.52.jar -xz-1.0.jar -zookeeper-3.4.5.jar diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3 deleted file mode 100644 index 39ba2ae849e681e0a17b59728ca7412eaa8e79ca..0000000000000000000000000000000000000000 --- a/dev/deps/spark-deps-hadoop-2.3 +++ /dev/null @@ -1,174 +0,0 @@ -JavaEWAH-0.3.2.jar -RoaringBitmap-0.5.11.jar -ST4-4.0.4.jar -activation-1.1.1.jar -antlr-2.7.7.jar -antlr-runtime-3.4.jar -antlr4-runtime-4.5.3.jar -aopalliance-1.0.jar -aopalliance-repackaged-2.4.0-b34.jar -apache-log4j-extras-1.2.17.jar -arpack_combined_all-0.1.jar -avro-1.7.7.jar -avro-ipc-1.7.7.jar -avro-mapred-1.7.7-hadoop2.jar -base64-2.3.8.jar -bcprov-jdk15on-1.51.jar -bonecp-0.8.0.RELEASE.jar -breeze-macros_2.11-0.12.jar -breeze_2.11-0.12.jar -calcite-avatica-1.2.0-incubating.jar -calcite-core-1.2.0-incubating.jar -calcite-linq4j-1.2.0-incubating.jar -chill-java-0.8.0.jar -chill_2.11-0.8.0.jar -commons-beanutils-1.7.0.jar -commons-beanutils-core-1.8.0.jar -commons-cli-1.2.jar -commons-codec-1.10.jar -commons-collections-3.2.2.jar -commons-compiler-3.0.0.jar -commons-compress-1.4.1.jar -commons-configuration-1.6.jar -commons-crypto-1.0.0.jar -commons-dbcp-1.4.jar -commons-digester-1.8.jar -commons-httpclient-3.1.jar -commons-io-2.4.jar -commons-lang-2.6.jar -commons-lang3-3.5.jar -commons-logging-1.1.3.jar -commons-math3-3.4.1.jar -commons-net-2.2.jar -commons-pool-1.5.4.jar -compress-lzf-1.0.3.jar -core-1.1.2.jar -curator-client-2.4.0.jar -curator-framework-2.4.0.jar -curator-recipes-2.4.0.jar -datanucleus-api-jdo-3.2.6.jar -datanucleus-core-3.2.10.jar -datanucleus-rdbms-3.2.9.jar -derby-10.12.1.1.jar -eigenbase-properties-1.1.5.jar -guava-14.0.1.jar -guice-3.0.jar -guice-servlet-3.0.jar -hadoop-annotations-2.3.0.jar -hadoop-auth-2.3.0.jar -hadoop-client-2.3.0.jar -hadoop-common-2.3.0.jar -hadoop-hdfs-2.3.0.jar -hadoop-mapreduce-client-app-2.3.0.jar -hadoop-mapreduce-client-common-2.3.0.jar -hadoop-mapreduce-client-core-2.3.0.jar -hadoop-mapreduce-client-jobclient-2.3.0.jar -hadoop-mapreduce-client-shuffle-2.3.0.jar -hadoop-yarn-api-2.3.0.jar -hadoop-yarn-client-2.3.0.jar -hadoop-yarn-common-2.3.0.jar -hadoop-yarn-server-common-2.3.0.jar -hadoop-yarn-server-web-proxy-2.3.0.jar -hk2-api-2.4.0-b34.jar -hk2-locator-2.4.0-b34.jar -hk2-utils-2.4.0-b34.jar -httpclient-4.5.2.jar -httpcore-4.4.4.jar -ivy-2.4.0.jar -jackson-annotations-2.6.5.jar -jackson-core-2.6.5.jar -jackson-core-asl-1.9.13.jar -jackson-databind-2.6.5.jar -jackson-mapper-asl-1.9.13.jar -jackson-module-paranamer-2.6.5.jar -jackson-module-scala_2.11-2.6.5.jar -janino-3.0.0.jar -java-xmlbuilder-1.0.jar -javassist-3.18.1-GA.jar -javax.annotation-api-1.2.jar -javax.inject-1.jar -javax.inject-2.4.0-b34.jar -javax.servlet-api-3.1.0.jar -javax.ws.rs-api-2.0.1.jar -javolution-5.5.1.jar -jaxb-api-2.2.2.jar -jcl-over-slf4j-1.7.16.jar -jdo-api-3.0.1.jar -jersey-client-2.22.2.jar -jersey-common-2.22.2.jar -jersey-container-servlet-2.22.2.jar -jersey-container-servlet-core-2.22.2.jar -jersey-guava-2.22.2.jar -jersey-media-jaxb-2.22.2.jar -jersey-server-2.22.2.jar -jets3t-0.9.3.jar -jetty-6.1.26.jar -jetty-util-6.1.26.jar -jline-2.12.1.jar -joda-time-2.9.3.jar -jodd-core-3.5.2.jar -jpam-1.1.jar -json4s-ast_2.11-3.2.11.jar -json4s-core_2.11-3.2.11.jar -json4s-jackson_2.11-3.2.11.jar -jsr305-1.3.9.jar -jta-1.1.jar -jtransforms-2.4.0.jar -jul-to-slf4j-1.7.16.jar -kryo-shaded-3.0.3.jar -leveldbjni-all-1.8.jar -libfb303-0.9.3.jar -libthrift-0.9.3.jar -log4j-1.2.17.jar -lz4-1.3.0.jar -mail-1.4.7.jar -mesos-1.0.0-shaded-protobuf.jar -metrics-core-3.1.2.jar -metrics-graphite-3.1.2.jar -metrics-json-3.1.2.jar -metrics-jvm-3.1.2.jar -minlog-1.3.0.jar -mx4j-3.0.2.jar -netty-3.9.9.Final.jar -netty-all-4.0.43.Final.jar -objenesis-2.1.jar -opencsv-2.3.jar -oro-2.0.8.jar -osgi-resource-locator-1.0.1.jar -paranamer-2.6.jar -parquet-column-1.8.2.jar -parquet-common-1.8.2.jar -parquet-encoding-1.8.2.jar -parquet-format-2.3.1.jar -parquet-hadoop-1.8.2.jar -parquet-hadoop-bundle-1.6.0.jar -parquet-jackson-1.8.2.jar -pmml-model-1.2.15.jar -pmml-schema-1.2.15.jar -protobuf-java-2.5.0.jar -py4j-0.10.4.jar -pyrolite-4.13.jar -scala-compiler-2.11.8.jar -scala-library-2.11.8.jar -scala-parser-combinators_2.11-1.0.4.jar -scala-reflect-2.11.8.jar -scala-xml_2.11-1.0.2.jar -scalap-2.11.8.jar -shapeless_2.11-2.0.0.jar -slf4j-api-1.7.16.jar -slf4j-log4j12-1.7.16.jar -snappy-0.2.jar -snappy-java-1.1.2.6.jar -spire-macros_2.11-0.7.4.jar -spire_2.11-0.7.4.jar -stax-api-1.0-2.jar -stax-api-1.0.1.jar -stream-2.7.0.jar -stringtemplate-3.2.1.jar -super-csv-2.2.0.jar -univocity-parsers-2.2.1.jar -validation-api-1.1.0.Final.jar -xbean-asm5-shaded-4.4.jar -xmlenc-0.52.jar -xz-1.0.jar -zookeeper-3.4.5.jar diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4 deleted file mode 100644 index d151d1279618243c9b1b69bb59b439ccdd2a07e0..0000000000000000000000000000000000000000 --- a/dev/deps/spark-deps-hadoop-2.4 +++ /dev/null @@ -1,174 +0,0 @@ -JavaEWAH-0.3.2.jar -RoaringBitmap-0.5.11.jar -ST4-4.0.4.jar -activation-1.1.1.jar -antlr-2.7.7.jar -antlr-runtime-3.4.jar -antlr4-runtime-4.5.3.jar -aopalliance-1.0.jar -aopalliance-repackaged-2.4.0-b34.jar -apache-log4j-extras-1.2.17.jar -arpack_combined_all-0.1.jar -avro-1.7.7.jar -avro-ipc-1.7.7.jar -avro-mapred-1.7.7-hadoop2.jar -base64-2.3.8.jar -bcprov-jdk15on-1.51.jar -bonecp-0.8.0.RELEASE.jar -breeze-macros_2.11-0.12.jar -breeze_2.11-0.12.jar -calcite-avatica-1.2.0-incubating.jar -calcite-core-1.2.0-incubating.jar -calcite-linq4j-1.2.0-incubating.jar -chill-java-0.8.0.jar -chill_2.11-0.8.0.jar -commons-beanutils-1.7.0.jar -commons-beanutils-core-1.8.0.jar -commons-cli-1.2.jar -commons-codec-1.10.jar -commons-collections-3.2.2.jar -commons-compiler-3.0.0.jar -commons-compress-1.4.1.jar -commons-configuration-1.6.jar -commons-crypto-1.0.0.jar -commons-dbcp-1.4.jar -commons-digester-1.8.jar -commons-httpclient-3.1.jar -commons-io-2.4.jar -commons-lang-2.6.jar -commons-lang3-3.5.jar -commons-logging-1.1.3.jar -commons-math3-3.4.1.jar -commons-net-2.2.jar -commons-pool-1.5.4.jar -compress-lzf-1.0.3.jar -core-1.1.2.jar -curator-client-2.4.0.jar -curator-framework-2.4.0.jar -curator-recipes-2.4.0.jar -datanucleus-api-jdo-3.2.6.jar -datanucleus-core-3.2.10.jar -datanucleus-rdbms-3.2.9.jar -derby-10.12.1.1.jar -eigenbase-properties-1.1.5.jar -guava-14.0.1.jar -guice-3.0.jar -guice-servlet-3.0.jar -hadoop-annotations-2.4.1.jar -hadoop-auth-2.4.1.jar -hadoop-client-2.4.1.jar -hadoop-common-2.4.1.jar -hadoop-hdfs-2.4.1.jar -hadoop-mapreduce-client-app-2.4.1.jar -hadoop-mapreduce-client-common-2.4.1.jar -hadoop-mapreduce-client-core-2.4.1.jar -hadoop-mapreduce-client-jobclient-2.4.1.jar -hadoop-mapreduce-client-shuffle-2.4.1.jar -hadoop-yarn-api-2.4.1.jar -hadoop-yarn-client-2.4.1.jar -hadoop-yarn-common-2.4.1.jar -hadoop-yarn-server-common-2.4.1.jar -hadoop-yarn-server-web-proxy-2.4.1.jar -hk2-api-2.4.0-b34.jar -hk2-locator-2.4.0-b34.jar -hk2-utils-2.4.0-b34.jar -httpclient-4.5.2.jar -httpcore-4.4.4.jar -ivy-2.4.0.jar -jackson-annotations-2.6.5.jar -jackson-core-2.6.5.jar -jackson-core-asl-1.9.13.jar -jackson-databind-2.6.5.jar -jackson-mapper-asl-1.9.13.jar -jackson-module-paranamer-2.6.5.jar -jackson-module-scala_2.11-2.6.5.jar -janino-3.0.0.jar -java-xmlbuilder-1.0.jar -javassist-3.18.1-GA.jar -javax.annotation-api-1.2.jar -javax.inject-1.jar -javax.inject-2.4.0-b34.jar -javax.servlet-api-3.1.0.jar -javax.ws.rs-api-2.0.1.jar -javolution-5.5.1.jar -jaxb-api-2.2.2.jar -jcl-over-slf4j-1.7.16.jar -jdo-api-3.0.1.jar -jersey-client-2.22.2.jar -jersey-common-2.22.2.jar -jersey-container-servlet-2.22.2.jar -jersey-container-servlet-core-2.22.2.jar -jersey-guava-2.22.2.jar -jersey-media-jaxb-2.22.2.jar -jersey-server-2.22.2.jar -jets3t-0.9.3.jar -jetty-6.1.26.jar -jetty-util-6.1.26.jar -jline-2.12.1.jar -joda-time-2.9.3.jar -jodd-core-3.5.2.jar -jpam-1.1.jar -json4s-ast_2.11-3.2.11.jar -json4s-core_2.11-3.2.11.jar -json4s-jackson_2.11-3.2.11.jar -jsr305-1.3.9.jar -jta-1.1.jar -jtransforms-2.4.0.jar -jul-to-slf4j-1.7.16.jar -kryo-shaded-3.0.3.jar -leveldbjni-all-1.8.jar -libfb303-0.9.3.jar -libthrift-0.9.3.jar -log4j-1.2.17.jar -lz4-1.3.0.jar -mail-1.4.7.jar -mesos-1.0.0-shaded-protobuf.jar -metrics-core-3.1.2.jar -metrics-graphite-3.1.2.jar -metrics-json-3.1.2.jar -metrics-jvm-3.1.2.jar -minlog-1.3.0.jar -mx4j-3.0.2.jar -netty-3.9.9.Final.jar -netty-all-4.0.43.Final.jar -objenesis-2.1.jar -opencsv-2.3.jar -oro-2.0.8.jar -osgi-resource-locator-1.0.1.jar -paranamer-2.6.jar -parquet-column-1.8.2.jar -parquet-common-1.8.2.jar -parquet-encoding-1.8.2.jar -parquet-format-2.3.1.jar -parquet-hadoop-1.8.2.jar -parquet-hadoop-bundle-1.6.0.jar -parquet-jackson-1.8.2.jar -pmml-model-1.2.15.jar -pmml-schema-1.2.15.jar -protobuf-java-2.5.0.jar -py4j-0.10.4.jar -pyrolite-4.13.jar -scala-compiler-2.11.8.jar -scala-library-2.11.8.jar -scala-parser-combinators_2.11-1.0.4.jar -scala-reflect-2.11.8.jar -scala-xml_2.11-1.0.2.jar -scalap-2.11.8.jar -shapeless_2.11-2.0.0.jar -slf4j-api-1.7.16.jar -slf4j-log4j12-1.7.16.jar -snappy-0.2.jar -snappy-java-1.1.2.6.jar -spire-macros_2.11-0.7.4.jar -spire_2.11-0.7.4.jar -stax-api-1.0-2.jar -stax-api-1.0.1.jar -stream-2.7.0.jar -stringtemplate-3.2.1.jar -super-csv-2.2.0.jar -univocity-parsers-2.2.1.jar -validation-api-1.1.0.Final.jar -xbean-asm5-shaded-4.4.jar -xmlenc-0.52.jar -xz-1.0.jar -zookeeper-3.4.5.jar diff --git a/dev/make-distribution.sh b/dev/make-distribution.sh index 6fb25f3b98722025c695c42afd175910efcc76f6..dc8dfb934419d242e95ad287634123f9e161df34 100755 --- a/dev/make-distribution.sh +++ b/dev/make-distribution.sh @@ -52,20 +52,6 @@ function exit_with_usage { # Parse arguments while (( "$#" )); do case $1 in - --hadoop) - echo "Error: '--hadoop' is no longer supported:" - echo "Error: use Maven profiles and options -Dhadoop.version and -Dyarn.version instead." - echo "Error: Related profiles include hadoop-2.2, hadoop-2.3, hadoop-2.4, hadoop-2.6 and hadoop-2.7." - exit_with_usage - ;; - --with-yarn) - echo "Error: '--with-yarn' is no longer supported, use Maven option -Pyarn" - exit_with_usage - ;; - --with-hive) - echo "Error: '--with-hive' is no longer supported, use Maven options -Phive and -Phive-thriftserver" - exit_with_usage - ;; --tgz) MAKE_TGZ=true ;; diff --git a/dev/run-tests.py b/dev/run-tests.py index ab285ac96af7ef6e96bad4905e2252d65e21dbd0..ef9ab1aa1a0254dc4cb3c3359a99200f6dc30582 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -305,9 +305,6 @@ def get_hadoop_profiles(hadoop_version): """ sbt_maven_hadoop_profiles = { - "hadoop2.2": ["-Phadoop-2.2"], - "hadoop2.3": ["-Phadoop-2.3"], - "hadoop2.4": ["-Phadoop-2.4"], "hadoop2.6": ["-Phadoop-2.6"], "hadoop2.7": ["-Phadoop-2.7"], } diff --git a/dev/test-dependencies.sh b/dev/test-dependencies.sh index 4014f42e1983ccf1a4457999b271d51389b7df6f..eb43f229c2d4c739fb7b1946fb60401efce5231a 100755 --- a/dev/test-dependencies.sh +++ b/dev/test-dependencies.sh @@ -32,9 +32,6 @@ export LC_ALL=C HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pyarn -Phive" MVN="build/mvn" HADOOP_PROFILES=( - hadoop-2.2 - hadoop-2.3 - hadoop-2.4 hadoop-2.6 hadoop-2.7 ) diff --git a/docs/building-spark.md b/docs/building-spark.md index ffe356f91802a6957ffc546817d50fbb44d294b8..690c656bad9fbb337bd1725794aaf67ab8b23534 100644 --- a/docs/building-spark.md +++ b/docs/building-spark.md @@ -48,7 +48,7 @@ You can fix these problems by setting the `MAVEN_OPTS` variable as discussed bef Spark now comes packaged with a self-contained Maven installation to ease building and deployment of Spark from source located under the `build/` directory. This script will automatically download and setup all necessary build requirements ([Maven](https://maven.apache.org/), [Scala](http://www.scala-lang.org/), and [Zinc](https://github.com/typesafehub/zinc)) locally within the `build/` directory itself. It honors any `mvn` binary if present already, however, will pull down its own copy of Scala and Zinc regardless to ensure proper version requirements are met. `build/mvn` execution acts as a pass through to the `mvn` call allowing easy transition from previous build methods. As an example, one can build a version of Spark as follows: - ./build/mvn -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.0 -DskipTests clean package + ./build/mvn -DskipTests clean package Other build examples can be found below. @@ -63,48 +63,21 @@ with Maven profile settings and so on like the direct Maven build. Example: This will build Spark distribution along with Python pip and R packages. For more information on usage, run `./dev/make-distribution.sh --help` -## Specifying the Hadoop Version +## Specifying the Hadoop Version and Enabling YARN -Because HDFS is not protocol-compatible across versions, if you want to read from HDFS, you'll need to build Spark against the specific HDFS version in your environment. You can do this through the `hadoop.version` property. If unset, Spark will build against Hadoop 2.2.0 by default. Note that certain build profiles are required for particular Hadoop versions: +You can specify the exact version of Hadoop to compile against through the `hadoop.version` property. +If unset, Spark will build against Hadoop 2.6.X by default. -<table class="table"> - <thead> - <tr><th>Hadoop version</th><th>Profile required</th></tr> - </thead> - <tbody> - <tr><td>2.2.x</td><td>hadoop-2.2</td></tr> - <tr><td>2.3.x</td><td>hadoop-2.3</td></tr> - <tr><td>2.4.x</td><td>hadoop-2.4</td></tr> - <tr><td>2.6.x</td><td>hadoop-2.6</td></tr> - <tr><td>2.7.x and later 2.x</td><td>hadoop-2.7</td></tr> - </tbody> -</table> - -Note that support for versions of Hadoop before 2.6 are deprecated as of Spark 2.1.0 and may be -removed in Spark 2.2.0. - - -You can enable the `yarn` profile and optionally set the `yarn.version` property if it is different from `hadoop.version`. Spark only supports YARN versions 2.2.0 and later. +You can enable the `yarn` profile and optionally set the `yarn.version` property if it is different +from `hadoop.version`. Examples: - # Apache Hadoop 2.2.X - ./build/mvn -Pyarn -Phadoop-2.2 -DskipTests clean package - - # Apache Hadoop 2.3.X - ./build/mvn -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -DskipTests clean package - - # Apache Hadoop 2.4.X or 2.5.X - ./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package - # Apache Hadoop 2.6.X - ./build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests clean package + ./build/mvn -Pyarn -DskipTests clean package # Apache Hadoop 2.7.X and later - ./build/mvn -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.0 -DskipTests clean package - - # Different versions of HDFS and YARN. - ./build/mvn -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -Dyarn.version=2.2.0 -DskipTests clean package + ./build/mvn -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.3 -DskipTests clean package ## Building With Hive and JDBC Support @@ -112,8 +85,8 @@ To enable Hive integration for Spark SQL along with its JDBC server and CLI, add the `-Phive` and `Phive-thriftserver` profiles to your existing build options. By default Spark will build with Hive 1.2.1 bindings. - # Apache Hadoop 2.4.X with Hive 1.2.1 support - ./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package + # With Hive 1.2.1 support + ./build/mvn -Pyarn -Phive -Phive-thriftserver -DskipTests clean package ## Packaging without Hadoop Dependencies for YARN @@ -132,7 +105,7 @@ like ZooKeeper and Hadoop itself. To produce a Spark package compiled with Scala 2.10, use the `-Dscala-2.10` property: ./dev/change-scala-version.sh 2.10 - ./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package + ./build/mvn -Pyarn -Dscala-2.10 -DskipTests clean package Note that support for Scala 2.10 is deprecated as of Spark 2.1.0 and may be removed in Spark 2.2.0. @@ -192,7 +165,7 @@ compilation. More advanced developers may wish to use SBT. The SBT build is derived from the Maven POM files, and so the same Maven profiles and variables can be set to control the SBT build. For example: - ./build/sbt -Pyarn -Phadoop-2.3 package + ./build/sbt package To avoid the overhead of launching sbt each time you need to re-compile, you can launch sbt in interactive mode by running `build/sbt`, and then run all build commands at the command @@ -225,7 +198,7 @@ Note that tests should not be run as root or an admin user. The following is an example of a command to run the tests: - ./build/mvn -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver test + ./build/mvn test The ScalaTest plugin also supports running only a specific Scala test suite as follows: @@ -240,16 +213,16 @@ or a Java test: The following is an example of a command to run the tests: - ./build/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver test + ./build/sbt test To run only a specific test suite as follows: - ./build/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver "test-only org.apache.spark.repl.ReplSuite" - ./build/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver "test-only org.apache.spark.repl.*" + ./build/sbt "test-only org.apache.spark.repl.ReplSuite" + ./build/sbt "test-only org.apache.spark.repl.*" To run test suites of a specific sub project as follows: - ./build/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver core/test + ./build/sbt core/test ## Running Java 8 Test Suites diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 051f64e1beabfa096b03e3b916450caae392c917..c95f627c12f2c4e51445bec6a85452fed9683b89 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -337,7 +337,7 @@ To use a custom metrics.properties for the application master and executors, upd <td> Defines the validity interval for AM failure tracking. If the AM has been running for at least the defined interval, the AM failure count will be reset. - This feature is not enabled if not configured, and only supported in Hadoop 2.6+. + This feature is not enabled if not configured. </td> </tr> <tr> diff --git a/pom.xml b/pom.xml index 3679b4e263619c25425e65c20e71804cd0294e69..ac61a57a613c298b2987d06ce49be5ca15cef559 100644 --- a/pom.xml +++ b/pom.xml @@ -122,12 +122,12 @@ <sbt.project.name>spark</sbt.project.name> <slf4j.version>1.7.16</slf4j.version> <log4j.version>1.2.17</log4j.version> - <hadoop.version>2.2.0</hadoop.version> + <hadoop.version>2.6.5</hadoop.version> <protobuf.version>2.5.0</protobuf.version> <yarn.version>${hadoop.version}</yarn.version> <flume.version>1.6.0</flume.version> - <zookeeper.version>3.4.5</zookeeper.version> - <curator.version>2.4.0</curator.version> + <zookeeper.version>3.4.6</zookeeper.version> + <curator.version>2.6.0</curator.version> <hive.group>org.spark-project.hive</hive.group> <!-- Version used in Maven Hive dependency --> <hive.version>1.2.1.spark2</hive.version> @@ -144,7 +144,7 @@ <codahale.metrics.version>3.1.2</codahale.metrics.version> <avro.version>1.7.7</avro.version> <avro.mapred.classifier>hadoop2</avro.mapred.classifier> - <jets3t.version>0.7.1</jets3t.version> + <jets3t.version>0.9.3</jets3t.version> <aws.kinesis.client.version>1.6.2</aws.kinesis.client.version> <!-- the producer is used in tests --> <aws.kinesis.producer.version>0.10.2</aws.kinesis.producer.version> @@ -2547,44 +2547,15 @@ http://hadoop.apache.org/docs/ra.b.c/hadoop-project-dist/hadoop-common/dependency-analysis.html --> - <profile> - <id>hadoop-2.2</id> - <!-- SPARK-7249: Default hadoop profile. Uses global properties. --> - </profile> - - <profile> - <id>hadoop-2.3</id> - <properties> - <hadoop.version>2.3.0</hadoop.version> - <jets3t.version>0.9.3</jets3t.version> - </properties> - </profile> - - <profile> - <id>hadoop-2.4</id> - <properties> - <hadoop.version>2.4.1</hadoop.version> - <jets3t.version>0.9.3</jets3t.version> - </properties> - </profile> - <profile> <id>hadoop-2.6</id> - <properties> - <hadoop.version>2.6.5</hadoop.version> - <jets3t.version>0.9.3</jets3t.version> - <zookeeper.version>3.4.6</zookeeper.version> - <curator.version>2.6.0</curator.version> - </properties> + <!-- Default hadoop profile. Uses global properties. --> </profile> <profile> <id>hadoop-2.7</id> <properties> <hadoop.version>2.7.3</hadoop.version> - <jets3t.version>0.9.3</jets3t.version> - <zookeeper.version>3.4.6</zookeeper.version> - <curator.version>2.6.0</curator.version> </properties> </profile> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 74edd537f528fc29a7afe7d119dc84a89e097d5c..bcc00fa3e953372ab849fb576140b017dfac9961 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -86,43 +86,11 @@ object SparkBuild extends PomBuild { val projectsMap: Map[String, Seq[Setting[_]]] = Map.empty - // Provides compatibility for older versions of the Spark build - def backwardCompatibility = { - import scala.collection.mutable - var profiles: mutable.Seq[String] = mutable.Seq("sbt") - // scalastyle:off println - if (Properties.envOrNone("SPARK_GANGLIA_LGPL").isDefined) { - println("NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pspark-ganglia-lgpl flag.") - profiles ++= Seq("spark-ganglia-lgpl") - } - if (Properties.envOrNone("SPARK_HIVE").isDefined) { - println("NOTE: SPARK_HIVE is deprecated, please use -Phive and -Phive-thriftserver flags.") - profiles ++= Seq("hive", "hive-thriftserver") - } - Properties.envOrNone("SPARK_HADOOP_VERSION") match { - case Some(v) => - println("NOTE: SPARK_HADOOP_VERSION is deprecated, please use -Dhadoop.version=" + v) - System.setProperty("hadoop.version", v) - case None => - } - if (Properties.envOrNone("SPARK_YARN").isDefined) { - println("NOTE: SPARK_YARN is deprecated, please use -Pyarn flag.") - profiles ++= Seq("yarn") - } - // scalastyle:on println - profiles - } - override val profiles = { val profiles = Properties.envOrNone("SBT_MAVEN_PROFILES") match { - case None => backwardCompatibility - case Some(v) => - if (backwardCompatibility.nonEmpty) - // scalastyle:off println - println("Note: We ignore environment variables, when use of profile is detected in " + - "conjunction with environment variable.") - // scalastyle:on println - v.split("(\\s+|,)").filterNot(_.isEmpty).map(_.trim.replaceAll("-P", "")).toSeq + case None => Seq("sbt") + case Some(v) => + v.split("(\\s+|,)").filterNot(_.isEmpty).map(_.trim.replaceAll("-P", "")).toSeq } if (System.getProperty("scala-2.10") == "") { diff --git a/resource-managers/yarn/pom.xml b/resource-managers/yarn/pom.xml index f090d2427dc278628ee1fa357a0c48c109b760e4..92a33de0aeab055f1d5a9ae4a45a0060b9bfcba1 100644 --- a/resource-managers/yarn/pom.xml +++ b/resource-managers/yarn/pom.xml @@ -125,34 +125,12 @@ <scope>test</scope> </dependency> - <!-- - See SPARK-3710. hadoop-yarn-server-tests in Hadoop 2.2 fails to pull some needed - dependencies, so they need to be added manually for the tests to work. - --> - <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-server-tests</artifactId> <classifier>tests</classifier> <scope>test</scope> </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty</artifactId> - <version>6.1.26</version> - <exclusions> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - </exclusions> - <scope>test</scope> - </dependency> <!-- Jersey 1 dependencies only required for YARN integration testing. Creating a YARN cluster diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index f79c66b9ff8a93b156adf5283f418411cb68504f..9df43aea3f3d5e31806753ce7ebd70ed79b25473 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark._ @@ -460,17 +461,15 @@ private[spark] class ApplicationMaster( } failureCount = 0 } catch { - case i: InterruptedException => + case i: InterruptedException => // do nothing + case e: ApplicationAttemptNotFoundException => + failureCount += 1 + logError("Exception from Reporter thread.", e) + finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE, + e.getMessage) case e: Throwable => failureCount += 1 - // this exception was introduced in hadoop 2.4 and this code would not compile - // with earlier versions if we refer it directly. - if ("org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException" == - e.getClass().getName()) { - logError("Exception from Reporter thread.", e) - finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE, - e.getMessage) - } else if (!NonFatal(e) || failureCount >= reporterMaxFailures) { + if (!NonFatal(e) || failureCount >= reporterMaxFailures) { finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " + s"$failureCount time(s) from Reporter thread.") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index b212b0eaafcdf2ce93028f66e76fd0569646d7da..635c1ac5e3ec1841cfe92e22eadb58d091fe3cac 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -26,7 +26,6 @@ import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map} -import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal import com.google.common.base.Objects @@ -47,7 +46,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException import org.apache.hadoop.yarn.util.Records -import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException} +import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.yarn.config._ import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager @@ -216,18 +215,7 @@ private[spark] class Client( appContext.setApplicationType("SPARK") sparkConf.get(APPLICATION_TAGS).foreach { tags => - try { - // The setApplicationTags method was only introduced in Hadoop 2.4+, so we need to use - // reflection to set it, printing a warning if a tag was specified but the YARN version - // doesn't support it. - val method = appContext.getClass().getMethod( - "setApplicationTags", classOf[java.util.Set[String]]) - method.invoke(appContext, new java.util.HashSet[String](tags.asJava)) - } catch { - case e: NoSuchMethodException => - logWarning(s"Ignoring ${APPLICATION_TAGS.key} because this version of " + - "YARN does not support it") - } + appContext.setApplicationTags(new java.util.HashSet[String](tags.asJava)) } sparkConf.get(MAX_APP_ATTEMPTS) match { case Some(v) => appContext.setMaxAppAttempts(v) @@ -236,15 +224,7 @@ private[spark] class Client( } sparkConf.get(AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval => - try { - val method = appContext.getClass().getMethod( - "setAttemptFailuresValidityInterval", classOf[Long]) - method.invoke(appContext, interval: java.lang.Long) - } catch { - case e: NoSuchMethodException => - logWarning(s"Ignoring ${AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key} because " + - "the version of YARN does not support it") - } + appContext.setAttemptFailuresValidityInterval(interval) } val capability = Records.newRecord(classOf[Resource]) @@ -253,53 +233,24 @@ private[spark] class Client( sparkConf.get(AM_NODE_LABEL_EXPRESSION) match { case Some(expr) => - try { - val amRequest = Records.newRecord(classOf[ResourceRequest]) - amRequest.setResourceName(ResourceRequest.ANY) - amRequest.setPriority(Priority.newInstance(0)) - amRequest.setCapability(capability) - amRequest.setNumContainers(1) - val method = amRequest.getClass.getMethod("setNodeLabelExpression", classOf[String]) - method.invoke(amRequest, expr) - - val setResourceRequestMethod = - appContext.getClass.getMethod("setAMContainerResourceRequest", classOf[ResourceRequest]) - setResourceRequestMethod.invoke(appContext, amRequest) - } catch { - case e: NoSuchMethodException => - logWarning(s"Ignoring ${AM_NODE_LABEL_EXPRESSION.key} because the version " + - "of YARN does not support it") - appContext.setResource(capability) - } + val amRequest = Records.newRecord(classOf[ResourceRequest]) + amRequest.setResourceName(ResourceRequest.ANY) + amRequest.setPriority(Priority.newInstance(0)) + amRequest.setCapability(capability) + amRequest.setNumContainers(1) + amRequest.setNodeLabelExpression(expr) + appContext.setAMContainerResourceRequest(amRequest) case None => appContext.setResource(capability) } sparkConf.get(ROLLED_LOG_INCLUDE_PATTERN).foreach { includePattern => - try { - val logAggregationContext = Records.newRecord( - Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext")) - .asInstanceOf[Object] - - val setRolledLogsIncludePatternMethod = - logAggregationContext.getClass.getMethod("setRolledLogsIncludePattern", classOf[String]) - setRolledLogsIncludePatternMethod.invoke(logAggregationContext, includePattern) - - sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern => - val setRolledLogsExcludePatternMethod = - logAggregationContext.getClass.getMethod("setRolledLogsExcludePattern", classOf[String]) - setRolledLogsExcludePatternMethod.invoke(logAggregationContext, excludePattern) - } - - val setLogAggregationContextMethod = - appContext.getClass.getMethod("setLogAggregationContext", - Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext")) - setLogAggregationContextMethod.invoke(appContext, logAggregationContext) - } catch { - case NonFatal(e) => - logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the version of YARN " + - s"does not support it", e) + val logAggregationContext = Records.newRecord(classOf[LogAggregationContext]) + logAggregationContext.setRolledLogsIncludePattern(includePattern) + sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern => + logAggregationContext.setRolledLogsExcludePattern(excludePattern) } + appContext.setLogAggregationContext(logAggregationContext) } appContext @@ -786,14 +737,12 @@ private[spark] class Client( val pythonPath = new ListBuffer[String]() val (pyFiles, pyArchives) = sparkConf.get(PY_FILES).partition(_.endsWith(".py")) if (pyFiles.nonEmpty) { - pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), - LOCALIZED_PYTHON_DIR) + pythonPath += buildPath(Environment.PWD.$$(), LOCALIZED_PYTHON_DIR) } (pySparkArchives ++ pyArchives).foreach { path => val uri = Utils.resolveURI(path) if (uri.getScheme != LOCAL_SCHEME) { - pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), - new Path(uri).getName()) + pythonPath += buildPath(Environment.PWD.$$(), new Path(uri).getName()) } else { pythonPath += uri.getPath() } @@ -802,7 +751,7 @@ private[spark] class Client( // Finally, update the Spark config to propagate PYTHONPATH to the AM and executors. if (pythonPath.nonEmpty) { val pythonPathStr = (sys.env.get("PYTHONPATH") ++ pythonPath) - .mkString(YarnSparkHadoopUtil.getClassPathSeparator) + .mkString(ApplicationConstants.CLASS_PATH_SEPARATOR) env("PYTHONPATH") = pythonPathStr sparkConf.setExecutorEnv("PYTHONPATH", pythonPathStr) } @@ -882,10 +831,7 @@ private[spark] class Client( // Add Xmx for AM memory javaOpts += "-Xmx" + amMemory + "m" - val tmpDir = new Path( - YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), - YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR - ) + val tmpDir = new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) javaOpts += "-Djava.io.tmpdir=" + tmpDir // TODO: Remove once cpuset version is pushed out. @@ -982,15 +928,12 @@ private[spark] class Client( Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg)) } val amArgs = - Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ - userArgs ++ Seq( - "--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), - LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) + Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++ + Seq("--properties-file", buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) // Command for the ApplicationMaster - val commands = prefixEnv ++ Seq( - YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server" - ) ++ + val commands = prefixEnv ++ + Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ javaOpts ++ amArgs ++ Seq( "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", @@ -1265,59 +1208,28 @@ private object Client extends Logging { private[yarn] def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) : Unit = { val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf) - for (c <- classPathElementsToAdd.flatten) { + classPathElementsToAdd.foreach { c => YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, c.trim) } } - private def getYarnAppClasspath(conf: Configuration): Option[Seq[String]] = + private def getYarnAppClasspath(conf: Configuration): Seq[String] = Option(conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) match { - case Some(s) => Some(s.toSeq) + case Some(s) => s.toSeq case None => getDefaultYarnApplicationClasspath } - private def getMRAppClasspath(conf: Configuration): Option[Seq[String]] = + private def getMRAppClasspath(conf: Configuration): Seq[String] = Option(conf.getStrings("mapreduce.application.classpath")) match { - case Some(s) => Some(s.toSeq) + case Some(s) => s.toSeq case None => getDefaultMRApplicationClasspath } - private[yarn] def getDefaultYarnApplicationClasspath: Option[Seq[String]] = { - val triedDefault = Try[Seq[String]] { - val field = classOf[YarnConfiguration].getField("DEFAULT_YARN_APPLICATION_CLASSPATH") - val value = field.get(null).asInstanceOf[Array[String]] - value.toSeq - } recoverWith { - case e: NoSuchFieldException => Success(Seq.empty[String]) - } - - triedDefault match { - case f: Failure[_] => - logError("Unable to obtain the default YARN Application classpath.", f.exception) - case s: Success[Seq[String]] => - logDebug(s"Using the default YARN application classpath: ${s.get.mkString(",")}") - } - - triedDefault.toOption - } - - private[yarn] def getDefaultMRApplicationClasspath: Option[Seq[String]] = { - val triedDefault = Try[Seq[String]] { - val field = classOf[MRJobConfig].getField("DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH") - StringUtils.getStrings(field.get(null).asInstanceOf[String]).toSeq - } recoverWith { - case e: NoSuchFieldException => Success(Seq.empty[String]) - } + private[yarn] def getDefaultYarnApplicationClasspath: Seq[String] = + YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH.toSeq - triedDefault match { - case f: Failure[_] => - logError("Unable to obtain the default MR Application classpath.", f.exception) - case s: Success[Seq[String]] => - logDebug(s"Using the default MR application classpath: ${s.get.mkString(",")}") - } - - triedDefault.toOption - } + private[yarn] def getDefaultMRApplicationClasspath: Seq[String] = + StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH).toSeq /** * Populate the classpath entry in the given environment map. @@ -1339,11 +1251,9 @@ private object Client extends Logging { addClasspathEntry(getClusterPath(sparkConf, cp), env) } - addClasspathEntry(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env) + addClasspathEntry(Environment.PWD.$$(), env) - addClasspathEntry( - YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + - LOCALIZED_CONF_DIR, env) + addClasspathEntry(Environment.PWD.$$() + Path.SEPARATOR + LOCALIZED_CONF_DIR, env) if (sparkConf.get(USER_CLASS_PATH_FIRST)) { // in order to properly add the app jar when user classpath is first @@ -1369,9 +1279,8 @@ private object Client extends Logging { } // Add the Spark jars to the classpath, depending on how they were distributed. - addClasspathEntry(buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), - LOCALIZED_LIB_DIR, "*"), env) - if (!sparkConf.get(SPARK_ARCHIVE).isDefined) { + addClasspathEntry(buildPath(Environment.PWD.$$(), LOCALIZED_LIB_DIR, "*"), env) + if (sparkConf.get(SPARK_ARCHIVE).isEmpty) { sparkConf.get(SPARK_JARS).foreach { jars => jars.filter(isLocalUri).foreach { jar => addClasspathEntry(getClusterPath(sparkConf, jar), env) @@ -1430,13 +1339,11 @@ private object Client extends Logging { if (uri != null && uri.getScheme == LOCAL_SCHEME) { addClasspathEntry(getClusterPath(conf, uri.getPath), env) } else if (fileName != null) { - addClasspathEntry(buildPath( - YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env) + addClasspathEntry(buildPath(Environment.PWD.$$(), fileName), env) } else if (uri != null) { val localPath = getQualifiedLocalPath(uri, hadoopConf) val linkName = Option(uri.getFragment()).getOrElse(localPath.getName()) - addClasspathEntry(buildPath( - YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), linkName), env) + addClasspathEntry(buildPath(Environment.PWD.$$(), linkName), env) } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 868c2edc5a4632b253435c228fc8165f80872233..b55b4b147bb7e50f6ab7bf7b26c07d31221ad1ed 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -152,10 +152,7 @@ private[yarn] class ExecutorRunnable( } javaOpts += "-Djava.io.tmpdir=" + - new Path( - YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), - YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR - ) + new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) // Certain configs need to be passed here because they are needed before the Executor // registers with the Scheduler and transfers the spark configs. Since the Executor backend @@ -206,9 +203,8 @@ private[yarn] class ExecutorRunnable( }.toSeq YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts) - val commands = prefixEnv ++ Seq( - YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", - "-server") ++ + val commands = prefixEnv ++ + Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ javaOpts ++ Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend", "--driver-url", masterAddress, diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index e498932e51ed5b8ea80a48e019ce8e754db08d40..8a76dbd1bf0e3f909066948a36c87369869fa9a4 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -150,20 +150,6 @@ private[yarn] class YarnAllocator( private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION) - // ContainerRequest constructor that can take a node label expression. We grab it through - // reflection because it's only available in later versions of YARN. - private val nodeLabelConstructor = labelExpression.flatMap { expr => - try { - Some(classOf[ContainerRequest].getConstructor(classOf[Resource], - classOf[Array[String]], classOf[Array[String]], classOf[Priority], classOf[Boolean], - classOf[String])) - } catch { - case e: NoSuchMethodException => - logWarning(s"Node label expression $expr will be ignored because YARN version on" + - " classpath does not support it.") - None - } - } // A map to store preferred hostname and possible task numbers running on it. private var hostToLocalTaskCounts: Map[String, Int] = Map.empty @@ -414,10 +400,7 @@ private[yarn] class YarnAllocator( resource: Resource, nodes: Array[String], racks: Array[String]): ContainerRequest = { - nodeLabelConstructor.map { constructor => - constructor.newInstance(resource, nodes, racks, RM_REQUEST_PRIORITY, true: java.lang.Boolean, - labelExpression.orNull) - }.getOrElse(new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY)) + new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY, true, labelExpression.orNull) } /** diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index 53df11eb6602134a3684f2974a5808fea870a149..163dfb5a605c8a1d1b1fc3c960e4696e57c2335a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -17,12 +17,8 @@ package org.apache.spark.deploy.yarn -import java.util.{List => JList} - import scala.collection.JavaConverters._ -import scala.util.Try -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest @@ -99,24 +95,11 @@ private[spark] class YarnRMClient extends Logging { def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String] = { // Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2, // so not all stable releases have it. - val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", classOf[Configuration]) - .invoke(null, conf).asInstanceOf[String]).getOrElse("http://") - - // If running a new enough Yarn, use the HA-aware API for retrieving the RM addresses. - try { - val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter", - classOf[Configuration]) - val proxies = method.invoke(null, conf).asInstanceOf[JList[String]] - val hosts = proxies.asScala.map { proxy => proxy.split(":")(0) } - val uriBases = proxies.asScala.map { proxy => prefix + proxy + proxyBase } - Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(",")) - } catch { - case e: NoSuchMethodException => - val proxy = WebAppUtils.getProxyHostAndPort(conf) - val parts = proxy.split(":") - val uriBase = prefix + proxy + proxyBase - Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase) - } + val prefix = WebAppUtils.getHttpSchemePrefix(conf) + val proxies = WebAppUtils.getProxyHostsAndPortsForAmFilter(conf) + val hosts = proxies.asScala.map(_.split(":").head) + val uriBases = proxies.asScala.map { proxy => prefix + proxy + proxyBase } + Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(",")) } /** Returns the maximum number of attempts to register the AM. */ @@ -124,12 +107,10 @@ private[spark] class YarnRMClient extends Logging { val sparkMaxAttempts = sparkConf.get(MAX_APP_ATTEMPTS).map(_.toInt) val yarnMaxAttempts = yarnConf.getInt( YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS) - val retval: Int = sparkMaxAttempts match { + sparkMaxAttempts match { case Some(x) => if (x <= yarnMaxAttempts) x else yarnMaxAttempts case None => yarnMaxAttempts } - - retval } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index cc53b1b06e94afe6701a17607908e984cd003b85..93578855122cd61dde7f7f6a015e4a7bb65f3524 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -17,13 +17,11 @@ package org.apache.spark.deploy.yarn -import java.io.File import java.nio.charset.StandardCharsets.UTF_8 import java.util.regex.Matcher import java.util.regex.Pattern import scala.collection.mutable.{HashMap, ListBuffer} -import scala.util.Try import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.Text @@ -31,7 +29,6 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api.ApplicationConstants -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.ConverterUtils @@ -137,7 +134,12 @@ object YarnSparkHadoopUtil { * If the map already contains this key, append the value to the existing value instead. */ def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit = { - val newValue = if (env.contains(key)) { env(key) + getClassPathSeparator + value } else value + val newValue = + if (env.contains(key)) { + env(key) + ApplicationConstants.CLASS_PATH_SEPARATOR + value + } else { + value + } env.put(key, newValue) } @@ -156,8 +158,8 @@ object YarnSparkHadoopUtil { while (m.find()) { val variable = m.group(1) var replace = "" - if (env.get(variable) != None) { - replace = env.get(variable).get + if (env.contains(variable)) { + replace = env(variable) } else { // if this key is not configured for the child .. get it from the env replace = System.getenv(variable) @@ -235,13 +237,11 @@ object YarnSparkHadoopUtil { YarnCommandBuilderUtils.quoteForBatchScript(arg) } else { val escaped = new StringBuilder("'") - for (i <- 0 to arg.length() - 1) { - arg.charAt(i) match { - case '$' => escaped.append("\\$") - case '"' => escaped.append("\\\"") - case '\'' => escaped.append("'\\''") - case c => escaped.append(c) - } + arg.foreach { + case '$' => escaped.append("\\$") + case '"' => escaped.append("\\\"") + case '\'' => escaped.append("'\\''") + case c => escaped.append(c) } escaped.append("'").toString() } @@ -262,33 +262,6 @@ object YarnSparkHadoopUtil { ) } - /** - * Expand environment variable using Yarn API. - * If environment.$$() is implemented, return the result of it. - * Otherwise, return the result of environment.$() - * Note: $$() is added in Hadoop 2.4. - */ - private lazy val expandMethod = - Try(classOf[Environment].getMethod("$$")) - .getOrElse(classOf[Environment].getMethod("$")) - - def expandEnvironment(environment: Environment): String = - expandMethod.invoke(environment).asInstanceOf[String] - - /** - * Get class path separator using Yarn API. - * If ApplicationConstants.CLASS_PATH_SEPARATOR is implemented, return it. - * Otherwise, return File.pathSeparator - * Note: CLASS_PATH_SEPARATOR is added in Hadoop 2.4. - */ - private lazy val classPathSeparatorField = - Try(classOf[ApplicationConstants].getField("CLASS_PATH_SEPARATOR")) - .getOrElse(classOf[File].getField("pathSeparator")) - - def getClassPathSeparator(): String = { - classPathSeparatorField.get(null).asInstanceOf[String] - } - /** * Getting the initial target number of executors depends on whether dynamic allocation is * enabled. diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 7deaf0af94849070bd4f34c6edc32c2831f96733..dd2180a0f5e1e58cdd29e1a383a3b4e73973e6f3 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -23,8 +23,6 @@ import java.util.Properties import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap => MutableHashMap} -import scala.reflect.ClassTag -import scala.util.Try import org.apache.commons.lang3.SerializationUtils import org.apache.hadoop.conf.Configuration @@ -67,19 +65,18 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll } test("default Yarn application classpath") { - getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP)) + getDefaultYarnApplicationClasspath should be(Fixtures.knownDefYarnAppCP) } test("default MR application classpath") { - getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP)) + getDefaultMRApplicationClasspath should be(Fixtures.knownDefMRAppCP) } test("resultant classpath for an application that defines a classpath for YARN") { withAppConf(Fixtures.mapYARNAppConf) { conf => val env = newEnv populateHadoopClasspath(conf, env) - classpath(env) should be( - flatten(Fixtures.knownYARNAppCP, getDefaultMRApplicationClasspath)) + classpath(env) should be(Fixtures.knownYARNAppCP +: getDefaultMRApplicationClasspath) } } @@ -87,8 +84,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll withAppConf(Fixtures.mapMRAppConf) { conf => val env = newEnv populateHadoopClasspath(conf, env) - classpath(env) should be( - flatten(getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP)) + classpath(env) should be(getDefaultYarnApplicationClasspath :+ Fixtures.knownMRAppCP) } } @@ -96,7 +92,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll withAppConf(Fixtures.mapAppConf) { conf => val env = newEnv populateHadoopClasspath(conf, env) - classpath(env) should be(flatten(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP)) + classpath(env) should be(Array(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP)) } } @@ -104,14 +100,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll private val USER = "local:/userJar" private val ADDED = "local:/addJar1,local:/addJar2,/addJar3" - private val PWD = - if (classOf[Environment].getMethods().exists(_.getName == "$$")) { - "{{PWD}}" - } else if (Utils.isWindows) { - "%PWD%" - } else { - Environment.PWD.$() - } + private val PWD = "{{PWD}}" test("Local jar URIs") { val conf = new Configuration() @@ -388,26 +377,18 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll object Fixtures { val knownDefYarnAppCP: Seq[String] = - getFieldValue[Array[String], Seq[String]](classOf[YarnConfiguration], - "DEFAULT_YARN_APPLICATION_CLASSPATH", - Seq[String]())(a => a.toSeq) - + YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH.toSeq val knownDefMRAppCP: Seq[String] = - getFieldValue2[String, Array[String], Seq[String]]( - classOf[MRJobConfig], - "DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH", - Seq[String]())(a => a.split(","))(a => a.toSeq) + MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH.split(",").toSeq - val knownYARNAppCP = Some(Seq("/known/yarn/path")) + val knownYARNAppCP = "/known/yarn/path" - val knownMRAppCP = Some(Seq("/known/mr/path")) + val knownMRAppCP = "/known/mr/path" - val mapMRAppConf = - Map("mapreduce.application.classpath" -> knownMRAppCP.map(_.mkString(":")).get) + val mapMRAppConf = Map("mapreduce.application.classpath" -> knownMRAppCP) - val mapYARNAppConf = - Map(YarnConfiguration.YARN_APPLICATION_CLASSPATH -> knownYARNAppCP.map(_.mkString(":")).get) + val mapYARNAppConf = Map(YarnConfiguration.YARN_APPLICATION_CLASSPATH -> knownYARNAppCP) val mapAppConf = mapYARNAppConf ++ mapMRAppConf } @@ -423,28 +404,6 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll def classpath(env: MutableHashMap[String, String]): Array[String] = env(Environment.CLASSPATH.name).split(":|;|<CPS>") - def flatten(a: Option[Seq[String]], b: Option[Seq[String]]): Array[String] = - (a ++ b).flatten.toArray - - def getFieldValue[A, B](clazz: Class[_], field: String, defaults: => B)(mapTo: A => B): B = { - Try(clazz.getField(field)) - .map(_.get(null).asInstanceOf[A]) - .toOption - .map(mapTo) - .getOrElse(defaults) - } - - def getFieldValue2[A: ClassTag, A1: ClassTag, B]( - clazz: Class[_], - field: String, - defaults: => B)(mapTo: A => B)(mapTo1: A1 => B): B = { - Try(clazz.getField(field)).map(_.get(null)).map { - case v: A => mapTo(v) - case v1: A1 => mapTo1(v1) - case _ => defaults - }.toOption.getOrElse(defaults) - } - private def createClient( sparkConf: SparkConf, conf: Configuration = new Configuration(), diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index 7fbbe12609fd5158324696a592330aeb127e5325..a057618b39950bb45d60a9af75a6731bfc58253f 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -22,8 +22,6 @@ import java.nio.charset.StandardCharsets import com.google.common.io.{ByteStreams, Files} import org.apache.hadoop.io.Text -import org.apache.hadoop.yarn.api.ApplicationConstants -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.records.ApplicationAccessType import org.apache.hadoop.yarn.conf.YarnConfiguration import org.scalatest.Matchers @@ -147,28 +145,6 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging } - test("test expandEnvironment result") { - val target = Environment.PWD - if (classOf[Environment].getMethods().exists(_.getName == "$$")) { - YarnSparkHadoopUtil.expandEnvironment(target) should be ("{{" + target + "}}") - } else if (Utils.isWindows) { - YarnSparkHadoopUtil.expandEnvironment(target) should be ("%" + target + "%") - } else { - YarnSparkHadoopUtil.expandEnvironment(target) should be ("$" + target) - } - - } - - test("test getClassPathSeparator result") { - if (classOf[ApplicationConstants].getFields().exists(_.getName == "CLASS_PATH_SEPARATOR")) { - YarnSparkHadoopUtil.getClassPathSeparator() should be ("<CPS>") - } else if (Utils.isWindows) { - YarnSparkHadoopUtil.getClassPathSeparator() should be (";") - } else { - YarnSparkHadoopUtil.getClassPathSeparator() should be (":") - } - } - test("check different hadoop utils based on env variable") { try { System.setProperty("SPARK_YARN_MODE", "true") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index d197daf7e093f8d1c7e45ac3c683ab6f3beffe7e..14f721d6a790a616c9d7f31ca534366bceae64b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -74,23 +74,21 @@ class FileScanRDD( // Find a function that will return the FileSystem bytes read by this thread. Do this before // apply readFunction, because it might read some bytes. - private val getBytesReadCallback: Option[() => Long] = + private val getBytesReadCallback = SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() - // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics. + // We get our input bytes from thread-local Hadoop FileSystem statistics. // If we do a coalesce, however, we are likely to compute multiple partitions in the same // task and in the same thread, in which case we need to avoid override values written by // previous partitions (SPARK-13071). private def updateBytesRead(): Unit = { - getBytesReadCallback.foreach { getBytesRead => - inputMetrics.setBytesRead(existingBytesRead + getBytesRead()) - } + inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback()) } // If we can't get the bytes read from the FS stats, fall back to the file size, // which may be inaccurate. private def updateBytesReadWithFileSize(): Unit = { - if (getBytesReadCallback.isEmpty && currentFile != null) { + if (currentFile != null) { inputMetrics.incBytesRead(currentFile.length) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala index 938af25a96844678dc163d99c8980fa6b50bb63b..c3dd6939ec5bd607f81daf4d2e7522e45ee06a7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala @@ -59,10 +59,6 @@ class RecordReaderIterator[T]( override def close(): Unit = { if (rowReader != null) { try { - // Close the reader and release it. Note: it's very important that we don't close the - // reader more than once, since that exposes us to MAPREDUCE-5918 when running against - // older Hadoop 2.x releases. That bug can lead to non-deterministic corruption issues - // when reading compressed input. rowReader.close() } finally { rowReader = null diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 1b413528935f68e341cb27e766ca90e90cfbac83..bfdc2cb0ac5b8e36ef3595ec7820b96c21d952ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -151,7 +151,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: IOUtils.closeQuietly(output) } } catch { - case e: IOException if isFileAlreadyExistsException(e) => + case e: FileAlreadyExistsException => // Failed to create "tempPath". There are two cases: // 1. Someone is creating "tempPath" too. // 2. This is a restart. "tempPath" has already been created but not moved to the final @@ -190,7 +190,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc") if (fileManager.exists(crcPath)) fileManager.delete(crcPath) } catch { - case e: IOException if isFileAlreadyExistsException(e) => + case e: FileAlreadyExistsException => // If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch. // So throw an exception to tell the user this is not a valid behavior. throw new ConcurrentModificationException( @@ -206,13 +206,6 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: } } - private def isFileAlreadyExistsException(e: IOException): Boolean = { - e.isInstanceOf[FileAlreadyExistsException] || - // Old Hadoop versions don't throw FileAlreadyExistsException. Although it's fixed in - // HADOOP-9361 in Hadoop 2.5, we still need to support old Hadoop versions. - (e.getMessage != null && e.getMessage.startsWith("File already exists: ")) - } - /** * @return the deserialized metadata in a batch file, or None if file not exist. * @throws IllegalArgumentException when path does not point to a batch file. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 63fdd6b090e6fe93d8920f6b9a2d6639b5613c7e..d2487a2c034c07541982f7c221c1b9ceb33e1bcd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -52,7 +52,7 @@ private[hive] object IsolatedClientLoader extends Logging { barrierPrefixes: Seq[String] = Seq.empty): IsolatedClientLoader = synchronized { val resolvedVersion = hiveVersion(hiveMetastoreVersion) // We will first try to share Hadoop classes. If we cannot resolve the Hadoop artifact - // with the given version, we will use Hadoop 2.4.0 and then will not share Hadoop classes. + // with the given version, we will use Hadoop 2.6 and then will not share Hadoop classes. var sharesHadoopClasses = true val files = if (resolvedVersions.contains((resolvedVersion, hadoopVersion))) { resolvedVersions((resolvedVersion, hadoopVersion)) @@ -63,17 +63,14 @@ private[hive] object IsolatedClientLoader extends Logging { } catch { case e: RuntimeException if e.getMessage.contains("hadoop") => // If the error message contains hadoop, it is probably because the hadoop - // version cannot be resolved (e.g. it is a vendor specific version like - // 2.0.0-cdh4.1.1). If it is the case, we will try just - // "org.apache.hadoop:hadoop-client:2.4.0". "org.apache.hadoop:hadoop-client:2.4.0" - // is used just because we used to hard code it as the hadoop artifact to download. - logWarning(s"Failed to resolve Hadoop artifacts for the version ${hadoopVersion}. " + - s"We will change the hadoop version from ${hadoopVersion} to 2.4.0 and try again. " + + // version cannot be resolved. + logWarning(s"Failed to resolve Hadoop artifacts for the version $hadoopVersion. " + + s"We will change the hadoop version from $hadoopVersion to 2.6.0 and try again. " + "Hadoop classes will not be shared between Spark and Hive metastore client. " + "It is recommended to set jars used by Hive metastore client through " + "spark.sql.hive.metastore.jars in the production environment.") sharesHadoopClasses = false - (downloadVersion(resolvedVersion, "2.4.0", ivyPath), "2.4.0") + (downloadVersion(resolvedVersion, "2.6.5", ivyPath), "2.6.5") } resolvedVersions.put((resolvedVersion, actualHadoopVersion), downloadedFiles) resolvedVersions((resolvedVersion, actualHadoopVersion))