Skip to content
Snippets Groups Projects
Commit 2e89398e authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Merge pull request #254 from ScrapCodes/scala-2.10

Scala 2.10 migration

This PR migrates spark to scala 2.10.

Summary of changes apart from scala 2.10 migration:
(has no implications for user.)
1. Migrated Akka to 2.2.3.

Does not use remote death watch for it has a bug, where it tries to send message to dead node infinitely.

Uses an indestructible actorsystem which tolerates errors only on executors.

(Might be useful for user.)
4. New configuration settings introduced:

System.getProperty("spark.akka.heartbeat.pauses", "600")
System.getProperty("spark.akka.failure-detector.threshold", "300.0")
System.getProperty("spark.akka.heartbeat.interval", "1000")

Defaults for these are fairly large to only disable Failure detector that comes with akka. The reason for doing so is we have our own failure detector like mechanism in place and then this is just an overhead on top of that + it leads to a lot of false positives. But with these properties it is possible to enable them. A good use case for enabling it could be when someone wants spark to be sensitive (in a controllable manner ofc.) to GC pauses/Network lags and quickly evict executors that experienced it. More information is included in configuration.md

Once we have the SPARK-544 merged, I had like to deprecate atleast these akka properties and may be others too.

This PR is duplicate of #221(where all the discussion happened.) for that one pointed to master this one points to scala-2.10 branch.
parents 5429d62d d3090b79
No related branches found
No related tags found
No related merge requests found
Showing
with 482 additions and 119 deletions
......@@ -54,7 +54,7 @@ versions without YARN, use:
# Cloudera CDH 4.2.0 with MapReduce v1
$ SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.2.0 sbt/sbt assembly
For Apache Hadoop 2.x, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions
For Apache Hadoop 2.0.X, 2.1.X, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions
with YARN, also set `SPARK_YARN=true`:
# Apache Hadoop 2.0.5-alpha
......@@ -63,12 +63,14 @@ with YARN, also set `SPARK_YARN=true`:
# Cloudera CDH 4.2.0 with MapReduce v2
$ SPARK_HADOOP_VERSION=2.0.0-cdh4.2.0 SPARK_YARN=true sbt/sbt assembly
For convenience, these variables may also be set through the `conf/spark-env.sh` file
described below.
When building for Hadoop 2.2.X and newer, you'll need to include the additional `new-yarn` profile:
# Apache Hadoop 2.2.X and newer
$ mvn -Dyarn.version=2.2.0 -Dhadoop.version=2.2.0 -Pnew-yarn
When developing a Spark application, specify the Hadoop version by adding the
"hadoop-client" artifact to your project's dependencies. For example, if you're
using Hadoop 1.0.1 and build your application using SBT, add this entry to
using Hadoop 1.2.1 and build your application using SBT, add this entry to
`libraryDependencies`:
"org.apache.hadoop" % "hadoop-client" % "1.2.1"
......
......@@ -26,7 +26,7 @@
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-assembly_${scala-short.version}</artifactId>
<artifactId>spark-assembly_2.10</artifactId>
<name>Spark Project Assembly</name>
<url>http://spark.incubator.apache.org/</url>
......@@ -41,27 +41,27 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala-short.version}</artifactId>
<artifactId>spark-core_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-bagel_${scala-short.version}</artifactId>
<artifactId>spark-bagel_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala-short.version}</artifactId>
<artifactId>spark-mllib_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl_${scala-short.version}</artifactId>
<artifactId>spark-repl_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala-short.version}</artifactId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
......@@ -79,7 +79,7 @@
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<outputFile>${project.build.directory}/scala-${scala-short.version}/${project.artifactId}-${project.version}-hadoop${hadoop.version}.jar</outputFile>
<outputFile>${project.build.directory}/scala-2.10/${project.artifactId}-${project.version}-hadoop${hadoop.version}.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
......@@ -128,7 +128,7 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_${scala-short.version}</artifactId>
<artifactId>spark-yarn_2.10</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
......
......@@ -26,7 +26,7 @@
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-bagel_${scala-short.version}</artifactId>
<artifactId>spark-bagel_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project Bagel</name>
<url>http://spark.incubator.apache.org/</url>
......@@ -34,7 +34,7 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala-short.version}</artifactId>
<artifactId>spark-core_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
......@@ -43,18 +43,18 @@
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala-short.version}</artifactId>
<artifactId>scalatest_2.10</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala-short.version}</artifactId>
<artifactId>scalacheck_2.10</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala-short.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala-short.version}/test-classes</testOutputDirectory>
<outputDirectory>target/scala-2.10/classes</outputDirectory>
<testOutputDirectory>target/scala-2.10/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.scalatest</groupId>
......
......@@ -32,12 +32,26 @@ fi
# Build up classpath
CLASSPATH="$SPARK_CLASSPATH:$FWDIR/conf"
if [ -f "$FWDIR/RELEASE" ]; then
ASSEMBLY_JAR=`ls "$FWDIR"/jars/spark-assembly*.jar`
# First check if we have a dependencies jar. If so, include binary classes with the deps jar
if [ -f "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*-deps.jar ]; then
CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SCALA_VERSION/classes"
DEPS_ASSEMBLY_JAR=`ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*-deps.jar`
CLASSPATH="$CLASSPATH:$DEPS_ASSEMBLY_JAR"
else
ASSEMBLY_JAR=`ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*.jar`
# Else use spark-assembly jar from either RELEASE or assembly directory
if [ -f "$FWDIR/RELEASE" ]; then
ASSEMBLY_JAR=`ls "$FWDIR"/jars/spark-assembly*.jar`
else
ASSEMBLY_JAR=`ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*.jar`
fi
CLASSPATH="$CLASSPATH:$ASSEMBLY_JAR"
fi
CLASSPATH="$CLASSPATH:$ASSEMBLY_JAR"
# Add test classes if we're running from SBT or Maven with SPARK_TESTING set to 1
if [[ $SPARK_TESTING == 1 ]]; then
......
......@@ -28,7 +28,7 @@
# SPARK_SSH_OPTS Options passed to ssh when running remote commands.
##
usage="Usage: slaves.sh [--config confdir] command..."
usage="Usage: slaves.sh [--config <conf-dir>] command..."
# if no args specified, show usage
if [ $# -le 0 ]; then
......@@ -46,6 +46,23 @@ bin=`cd "$bin"; pwd`
# spark-env.sh. Save it here.
HOSTLIST=$SPARK_SLAVES
# Check if --config is passed as an argument. It is an optional parameter.
# Exit if the argument is not a directory.
if [ "$1" == "--config" ]
then
shift
conf_dir=$1
if [ ! -d "$conf_dir" ]
then
echo "ERROR : $conf_dir is not a directory"
echo $usage
exit 1
else
export SPARK_CONF_DIR=$conf_dir
fi
shift
fi
if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then
. "${SPARK_CONF_DIR}/spark-env.sh"
fi
......
......@@ -29,7 +29,7 @@
# SPARK_NICENESS The scheduling priority for daemons. Defaults to 0.
##
usage="Usage: spark-daemon.sh [--config <conf-dir>] [--hosts hostlistfile] (start|stop) <spark-command> <spark-instance-number> <args...>"
usage="Usage: spark-daemon.sh [--config <conf-dir>] (start|stop) <spark-command> <spark-instance-number> <args...>"
# if no args specified, show usage
if [ $# -le 1 ]; then
......@@ -43,6 +43,25 @@ bin=`cd "$bin"; pwd`
. "$bin/spark-config.sh"
# get arguments
# Check if --config is passed as an argument. It is an optional parameter.
# Exit if the argument is not a directory.
if [ "$1" == "--config" ]
then
shift
conf_dir=$1
if [ ! -d "$conf_dir" ]
then
echo "ERROR : $conf_dir is not a directory"
echo $usage
exit 1
else
export SPARK_CONF_DIR=$conf_dir
fi
shift
fi
startStop=$1
shift
command=$1
......
......@@ -19,7 +19,7 @@
# Run a Spark command on all slave hosts.
usage="Usage: spark-daemons.sh [--config confdir] [--hosts hostlistfile] [start|stop] command instance-number args..."
usage="Usage: spark-daemons.sh [--config <conf-dir>] [start|stop] command instance-number args..."
# if no args specified, show usage
if [ $# -le 1 ]; then
......
......@@ -17,8 +17,6 @@
# limitations under the License.
#
# Starts the master on the machine this script is executed on.
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
......
......@@ -80,6 +80,14 @@
# /metrics/aplications/json # App information
# /metrics/master/json # Master information
# org.apache.spark.metrics.sink.GraphiteSink
# Name: Default: Description:
# host NONE Hostname of Graphite server
# port NONE Port of Graphite server
# period 10 Poll period
# unit seconds Units of poll period
# prefix EMPTY STRING Prefix to prepend to metric name
## Examples
# Enable JmxSink for all instances by class name
#*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
......
......@@ -26,7 +26,7 @@
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala-short.version}</artifactId>
<artifactId>spark-core_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project Core</name>
<url>http://spark.incubator.apache.org/</url>
......@@ -48,6 +48,10 @@
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
......@@ -82,7 +86,7 @@
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill_${scala-short.version}</artifactId>
<artifactId>chill_2.10</artifactId>
<version>0.3.1</version>
</dependency>
<dependency>
......@@ -91,12 +95,16 @@
<version>0.3.1</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_${scala-short.version}</artifactId>
<groupId>${akka.group}</groupId>
<artifactId>akka-actor_2.10</artifactId>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_${scala-short.version}</artifactId>
<groupId>${akka.group}</groupId>
<artifactId>akka-remote_2.10</artifactId>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-slf4j_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
......@@ -104,7 +112,7 @@
</dependency>
<dependency>
<groupId>net.liftweb</groupId>
<artifactId>lift-json_${scala-short.version}</artifactId>
<artifactId>lift-json_2.10</artifactId>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
......@@ -114,10 +122,6 @@
<groupId>colt</groupId>
<artifactId>colt</artifactId>
</dependency>
<dependency>
<groupId>com.github.scala-incubator.io</groupId>
<artifactId>scala-io-file_${scala-short.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.mesos</groupId>
<artifactId>mesos</artifactId>
......@@ -146,6 +150,10 @@
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-ganglia</artifactId>
</dependency>
<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-graphite</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
......@@ -158,12 +166,12 @@
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala-short.version}</artifactId>
<artifactId>scalatest_2.10</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala-short.version}</artifactId>
<artifactId>scalacheck_2.10</artifactId>
<scope>test</scope>
</dependency>
<dependency>
......@@ -183,8 +191,8 @@
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala-short.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala-short.version}/test-classes</testOutputDirectory>
<outputDirectory>target/scala-2.10/classes</outputDirectory>
<testOutputDirectory>target/scala-2.10/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
......
......@@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
import org.apache.spark.storage.BlockId;
abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {
......@@ -33,7 +34,7 @@ abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {
}
public abstract void handle(ChannelHandlerContext ctx, ByteBuf in, FileHeader header);
public abstract void handleError(String blockId);
public abstract void handleError(BlockId blockId);
@Override
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) {
......
......@@ -24,6 +24,8 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.DefaultFileRegion;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.FileSegment;
class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
......@@ -34,41 +36,36 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
}
@Override
public void messageReceived(ChannelHandlerContext ctx, String blockId) {
String path = pResolver.getAbsolutePath(blockId);
// if getFilePath returns null, close the channel
if (path == null) {
public void messageReceived(ChannelHandlerContext ctx, String blockIdString) {
BlockId blockId = BlockId.apply(blockIdString);
FileSegment fileSegment = pResolver.getBlockLocation(blockId);
// if getBlockLocation returns null, close the channel
if (fileSegment == null) {
//ctx.close();
return;
}
File file = new File(path);
File file = fileSegment.file();
if (file.exists()) {
if (!file.isFile()) {
//logger.info("Not a file : " + file.getAbsolutePath());
ctx.write(new FileHeader(0, blockId).buffer());
ctx.flush();
return;
}
long length = file.length();
long length = fileSegment.length();
if (length > Integer.MAX_VALUE || length <= 0) {
//logger.info("too large file : " + file.getAbsolutePath() + " of size "+ length);
ctx.write(new FileHeader(0, blockId).buffer());
ctx.flush();
return;
}
int len = new Long(length).intValue();
//logger.info("Sending block "+blockId+" filelen = "+len);
//logger.info("header = "+ (new FileHeader(len, blockId)).buffer());
ctx.write((new FileHeader(len, blockId)).buffer());
try {
ctx.sendFile(new DefaultFileRegion(new FileInputStream(file)
.getChannel(), 0, file.length()));
.getChannel(), fileSegment.offset(), fileSegment.length()));
} catch (Exception e) {
//logger.warning("Exception when sending file : " + file.getAbsolutePath());
e.printStackTrace();
}
} else {
//logger.warning("File not found: " + file.getAbsolutePath());
ctx.write(new FileHeader(0, blockId).buffer());
}
ctx.flush();
......
......@@ -17,13 +17,10 @@
package org.apache.spark.network.netty;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.FileSegment;
public interface PathResolver {
/**
* Get the absolute path of the file
*
* @param fileId
* @return the absolute path of file
*/
public String getAbsolutePath(String fileId);
/** Get the file segment in which the given block resides. */
public FileSegment getBlockLocation(BlockId blockId);
}
......@@ -17,20 +17,29 @@
package org.apache.hadoop.mapred
private[apache]
trait SparkHadoopMapRedUtil {
def newJobContext(conf: JobConf, jobId: JobID): JobContext = {
val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl", "org.apache.hadoop.mapred.JobContext");
val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[org.apache.hadoop.mapreduce.JobID])
val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl",
"org.apache.hadoop.mapred.JobContext")
val ctor = klass.getDeclaredConstructor(classOf[JobConf],
classOf[org.apache.hadoop.mapreduce.JobID])
ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
}
def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = {
val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl", "org.apache.hadoop.mapred.TaskAttemptContext")
val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl",
"org.apache.hadoop.mapred.TaskAttemptContext")
val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID])
ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
}
def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = {
def newTaskAttemptID(
jtIdentifier: String,
jobId: Int,
isMap: Boolean,
taskId: Int,
attemptId: Int) = {
new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId)
}
......
......@@ -17,9 +17,10 @@
package org.apache.hadoop.mapreduce
import org.apache.hadoop.conf.Configuration
import java.lang.{Integer => JInteger, Boolean => JBoolean}
import org.apache.hadoop.conf.Configuration
private[apache]
trait SparkHadoopMapReduceUtil {
def newJobContext(conf: Configuration, jobId: JobID): JobContext = {
val klass = firstAvailableClass(
......@@ -37,23 +38,31 @@ trait SparkHadoopMapReduceUtil {
ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
}
def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = {
val klass = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptID");
def newTaskAttemptID(
jtIdentifier: String,
jobId: Int,
isMap: Boolean,
taskId: Int,
attemptId: Int) = {
val klass = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptID")
try {
// first, attempt to use the old-style constructor that takes a boolean isMap (not available in YARN)
// First, attempt to use the old-style constructor that takes a boolean isMap
// (not available in YARN)
val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], classOf[Boolean],
classOf[Int], classOf[Int])
ctor.newInstance(jtIdentifier, new JInteger(jobId), new JBoolean(isMap), new JInteger(taskId), new
JInteger(attemptId)).asInstanceOf[TaskAttemptID]
classOf[Int], classOf[Int])
ctor.newInstance(jtIdentifier, new JInteger(jobId), new JBoolean(isMap), new JInteger(taskId),
new JInteger(attemptId)).asInstanceOf[TaskAttemptID]
} catch {
case exc: NoSuchMethodException => {
// failed, look for the new ctor that takes a TaskType (not available in 1.x)
val taskTypeClass = Class.forName("org.apache.hadoop.mapreduce.TaskType").asInstanceOf[Class[Enum[_]]]
val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke(taskTypeClass, if(isMap) "MAP" else "REDUCE")
// If that failed, look for the new constructor that takes a TaskType (not available in 1.x)
val taskTypeClass = Class.forName("org.apache.hadoop.mapreduce.TaskType")
.asInstanceOf[Class[Enum[_]]]
val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke(
taskTypeClass, if(isMap) "MAP" else "REDUCE")
val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], taskTypeClass,
classOf[Int], classOf[Int])
ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId), new
JInteger(attemptId)).asInstanceOf[TaskAttemptID]
ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId),
new JInteger(attemptId)).asInstanceOf[TaskAttemptID]
}
}
}
......
......@@ -17,43 +17,42 @@
package org.apache.spark
import java.util.{HashMap => JHashMap}
import org.apache.spark.util.AppendOnlyMap
import scala.collection.JavaConversions._
/** A set of functions used to aggregate data.
*
* @param createCombiner function to create the initial value of the aggregation.
* @param mergeValue function to merge a new value into the aggregation result.
* @param mergeCombiners function to merge outputs from multiple mergeValue function.
*/
/**
* A set of functions used to aggregate data.
*
* @param createCombiner function to create the initial value of the aggregation.
* @param mergeValue function to merge a new value into the aggregation result.
* @param mergeCombiners function to merge outputs from multiple mergeValue function.
*/
case class Aggregator[K, V, C] (
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) {
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
val combiners = new JHashMap[K, C]
for (kv <- iter) {
val oldC = combiners.get(kv._1)
if (oldC == null) {
combiners.put(kv._1, createCombiner(kv._2))
} else {
combiners.put(kv._1, mergeValue(oldC, kv._2))
}
val combiners = new AppendOnlyMap[K, C]
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (iter.hasNext) {
kv = iter.next()
combiners.changeValue(kv._1, update)
}
combiners.iterator
}
def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
val combiners = new JHashMap[K, C]
iter.foreach { case(k, c) =>
val oldC = combiners.get(k)
if (oldC == null) {
combiners.put(k, c)
} else {
combiners.put(k, mergeCombiners(oldC, c))
}
val combiners = new AppendOnlyMap[K, C]
var kc: (K, C) = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2
}
while (iter.hasNext) {
kc = iter.next()
combiners.changeValue(kc._1, update)
}
combiners.iterator
}
......
......@@ -22,13 +22,17 @@ import scala.collection.mutable.HashMap
import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId}
import org.apache.spark.util.CompletionIterator
private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
override def fetch[T](shuffleId: Int, reduceId: Int, metrics: TaskMetrics, serializer: Serializer)
override def fetch[T](
shuffleId: Int,
reduceId: Int,
context: TaskContext,
serializer: Serializer)
: Iterator[T] =
{
......@@ -45,12 +49,12 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size))
}
val blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])] = splitsByAddress.toSeq.map {
val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map {
case (address, splits) =>
(address, splits.map(s => ("shuffle_%d_%d_%d".format(shuffleId, s._1, reduceId), s._2)))
(address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
}
def unpackBlock(blockPair: (String, Option[Iterator[Any]])) : Iterator[T] = {
def unpackBlock(blockPair: (BlockId, Option[Iterator[Any]])) : Iterator[T] = {
val blockId = blockPair._1
val blockOption = blockPair._2
blockOption match {
......@@ -58,9 +62,8 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
block.asInstanceOf[Iterator[T]]
}
case None => {
val regex = "shuffle_([0-9]*)_([0-9]*)_([0-9]*)".r
blockId match {
case regex(shufId, mapId, _) =>
case ShuffleBlockId(shufId, mapId, _) =>
val address = statuses(mapId.toInt)._1
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, null)
case _ =>
......@@ -74,7 +77,7 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)
val itr = blockFetcherItr.flatMap(unpackBlock)
CompletionIterator[T, Iterator[T]](itr, {
val completionIter = CompletionIterator[T, Iterator[T]](itr, {
val shuffleMetrics = new ShuffleReadMetrics
shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime
......@@ -83,7 +86,9 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
metrics.shuffleReadMetrics = Some(shuffleMetrics)
context.taskMetrics.shuffleReadMetrics = Some(shuffleMetrics)
})
new InterruptibleIterator[T](context, completionIter)
}
}
......@@ -18,7 +18,7 @@
package org.apache.spark
import scala.collection.mutable.{ArrayBuffer, HashSet}
import org.apache.spark.storage.{BlockManager, StorageLevel}
import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, RDDBlockId}
import org.apache.spark.rdd.RDD
......@@ -28,17 +28,17 @@ import org.apache.spark.rdd.RDD
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
/** Keys of RDD splits that are being computed/loaded. */
private val loading = new HashSet[String]()
private val loading = new HashSet[RDDBlockId]()
/** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
: Iterator[T] = {
val key = "rdd_%d_%d".format(rdd.id, split.index)
val key = RDDBlockId(rdd.id, split.index)
logDebug("Looking for partition " + key)
blockManager.get(key) match {
case Some(values) =>
// Partition is already materialized, so just return its values
return values.asInstanceOf[Iterator[T]]
return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
case None =>
// Mark the split as loading (unless someone else marks it first)
......@@ -56,7 +56,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
// downside of the current code is that threads wait serially if this does happen.
blockManager.get(key) match {
case Some(values) =>
return values.asInstanceOf[Iterator[T]]
return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
case None =>
logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key))
loading.add(key)
......@@ -73,7 +73,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
if (context.runningLocally) { return computedValues }
val elements = new ArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel, true)
blockManager.put(key, elements, storageLevel, tellMaster = true)
return elements.iterator.asInstanceOf[Iterator[T]]
} finally {
loading.synchronized {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import scala.concurrent._
import scala.concurrent.duration.Duration
import scala.util.Try
import org.apache.spark.scheduler.{JobSucceeded, JobWaiter}
import org.apache.spark.scheduler.JobFailed
import org.apache.spark.rdd.RDD
/**
* A future for the result of an action. This is an extension of the Scala Future interface to
* support cancellation.
*/
trait FutureAction[T] extends Future[T] {
// Note that we redefine methods of the Future trait here explicitly so we can specify a different
// documentation (with reference to the word "action").
/**
* Cancels the execution of this action.
*/
def cancel()
/**
* Blocks until this action completes.
* @param atMost maximum wait time, which may be negative (no waiting is done), Duration.Inf
* for unbounded waiting, or a finite positive duration
* @return this FutureAction
*/
override def ready(atMost: Duration)(implicit permit: CanAwait): FutureAction.this.type
/**
* Awaits and returns the result (of type T) of this action.
* @param atMost maximum wait time, which may be negative (no waiting is done), Duration.Inf
* for unbounded waiting, or a finite positive duration
* @throws Exception exception during action execution
* @return the result value if the action is completed within the specific maximum wait time
*/
@throws(classOf[Exception])
override def result(atMost: Duration)(implicit permit: CanAwait): T
/**
* When this action is completed, either through an exception, or a value, applies the provided
* function.
*/
def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext)
/**
* Returns whether the action has already been completed with a value or an exception.
*/
override def isCompleted: Boolean
/**
* The value of this Future.
*
* If the future is not completed the returned value will be None. If the future is completed
* the value will be Some(Success(t)) if it contains a valid result, or Some(Failure(error)) if
* it contains an exception.
*/
override def value: Option[Try[T]]
/**
* Blocks and returns the result of this job.
*/
@throws(classOf[Exception])
def get(): T = Await.result(this, Duration.Inf)
}
/**
* The future holding the result of an action that triggers a single job. Examples include
* count, collect, reduce.
*/
class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T)
extends FutureAction[T] {
override def cancel() {
jobWaiter.cancel()
}
override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = {
if (!atMost.isFinite()) {
awaitResult()
} else jobWaiter.synchronized {
val finishTime = System.currentTimeMillis() + atMost.toMillis
while (!isCompleted) {
val time = System.currentTimeMillis()
if (time >= finishTime) {
throw new TimeoutException
} else {
jobWaiter.wait(finishTime - time)
}
}
}
this
}
@throws(classOf[Exception])
override def result(atMost: Duration)(implicit permit: CanAwait): T = {
ready(atMost)(permit)
awaitResult() match {
case scala.util.Success(res) => res
case scala.util.Failure(e) => throw e
}
}
override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext) {
executor.execute(new Runnable {
override def run() {
func(awaitResult())
}
})
}
override def isCompleted: Boolean = jobWaiter.jobFinished
override def value: Option[Try[T]] = {
if (jobWaiter.jobFinished) {
Some(awaitResult())
} else {
None
}
}
private def awaitResult(): Try[T] = {
jobWaiter.awaitResult() match {
case JobSucceeded => scala.util.Success(resultFunc)
case JobFailed(e: Exception, _) => scala.util.Failure(e)
}
}
}
/**
* A FutureAction for actions that could trigger multiple Spark jobs. Examples include take,
* takeSample. Cancellation works by setting the cancelled flag to true and interrupting the
* action thread if it is being blocked by a job.
*/
class ComplexFutureAction[T] extends FutureAction[T] {
// Pointer to the thread that is executing the action. It is set when the action is run.
@volatile private var thread: Thread = _
// A flag indicating whether the future has been cancelled. This is used in case the future
// is cancelled before the action was even run (and thus we have no thread to interrupt).
@volatile private var _cancelled: Boolean = false
// A promise used to signal the future.
private val p = promise[T]()
override def cancel(): Unit = this.synchronized {
_cancelled = true
if (thread != null) {
thread.interrupt()
}
}
/**
* Executes some action enclosed in the closure. To properly enable cancellation, the closure
* should use runJob implementation in this promise. See takeAsync for example.
*/
def run(func: => T)(implicit executor: ExecutionContext): this.type = {
scala.concurrent.future {
thread = Thread.currentThread
try {
p.success(func)
} catch {
case e: Exception => p.failure(e)
} finally {
thread = null
}
}
this
}
/**
* Runs a Spark job. This is a wrapper around the same functionality provided by SparkContext
* to enable cancellation.
*/
def runJob[T, U, R](
rdd: RDD[T],
processPartition: Iterator[T] => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit,
resultFunc: => R) {
// If the action hasn't been cancelled yet, submit the job. The check and the submitJob
// command need to be in an atomic block.
val job = this.synchronized {
if (!cancelled) {
rdd.context.submitJob(rdd, processPartition, partitions, resultHandler, resultFunc)
} else {
throw new SparkException("Action has been cancelled")
}
}
// Wait for the job to complete. If the action is cancelled (with an interrupt),
// cancel the job and stop the execution. This is not in a synchronized block because
// Await.ready eventually waits on the monitor in FutureJob.jobWaiter.
try {
Await.ready(job, Duration.Inf)
} catch {
case e: InterruptedException =>
job.cancel()
throw new SparkException("Action has been cancelled")
}
}
/**
* Returns whether the promise has been cancelled.
*/
def cancelled: Boolean = _cancelled
@throws(classOf[InterruptedException])
@throws(classOf[scala.concurrent.TimeoutException])
override def ready(atMost: Duration)(implicit permit: CanAwait): this.type = {
p.future.ready(atMost)(permit)
this
}
@throws(classOf[Exception])
override def result(atMost: Duration)(implicit permit: CanAwait): T = {
p.future.result(atMost)(permit)
}
override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext): Unit = {
p.future.onComplete(func)(executor)
}
override def isCompleted: Boolean = p.isCompleted
override def value: Option[Try[T]] = p.future.value
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
/**
* An iterator that wraps around an existing iterator to provide task killing functionality.
* It works by checking the interrupted flag in TaskContext.
*/
class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
extends Iterator[T] {
def hasNext: Boolean = !context.interrupted && delegate.hasNext
def next(): T = delegate.next()
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment