Skip to content
Snippets Groups Projects
Commit 6ad3e1f1 authored by root's avatar root
Browse files

Various fixes when running on Mesos

parent e896a505
No related branches found
No related tags found
No related merge requests found
......@@ -3,6 +3,8 @@ package spark
import java.io._
import java.nio.ByteBuffer
import spark.util.ByteBufferInputStream
class JavaSerializationStream(out: OutputStream) extends SerializationStream {
val objOut = new ObjectOutputStream(out)
def writeObject[T](t: T) { objOut.writeObject(t) }
......@@ -31,13 +33,13 @@ class JavaSerializerInstance extends SerializerInstance {
}
def deserialize[T](bytes: ByteBuffer): T = {
val bis = new ByteArrayInputStream(bytes.array())
val bis = new ByteBufferInputStream(bytes)
val in = deserializeStream(bis)
in.readObject().asInstanceOf[T]
}
def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
val bis = new ByteArrayInputStream(bytes.array())
val bis = new ByteBufferInputStream(bytes)
val in = deserializeStream(bis, loader)
in.readObject().asInstanceOf[T]
}
......
......@@ -93,7 +93,7 @@ class SparkContext(
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
new LocalScheduler(threads.toInt, maxFailures.toInt)
case _ =>
System.loadLibrary("mesos")
MesosNativeLibrary.load()
if (System.getProperty("spark.mesos.coarse", "false") == "true") {
new CoarseMesosScheduler(this, master, frameworkName)
} else {
......
......@@ -267,7 +267,8 @@ class TaskSetManager(
logInfo("Finished TID %s in %d ms (progress: %d/%d)".format(
tid, info.duration, tasksFinished, numTasks))
// Deserialize task result and pass it to the scheduler
val result = ser.deserialize[TaskResult[_]](status.getData.asReadOnlyByteBuffer)
val result = ser.deserialize[TaskResult[_]](
status.getData.asReadOnlyByteBuffer, getClass.getClassLoader)
sched.listener.taskEnded(tasks(index), Success, result.value, result.accumUpdates)
// Mark finished and stop if we've finished all the tasks
finished(index) = true
......@@ -291,7 +292,8 @@ class TaskSetManager(
// Check if the problem is a map output fetch failure. In that case, this
// task will never succeed on any node, so tell the scheduler about it.
if (status.getData != null && status.getData.size > 0) {
val reason = ser.deserialize[TaskEndReason](status.getData.asReadOnlyByteBuffer)
val reason = ser.deserialize[TaskEndReason](
status.getData.asReadOnlyByteBuffer, getClass.getClassLoader)
reason match {
case fetchFailed: FetchFailed =>
logInfo("Loss was due to fetch failure from " + fetchFailed.bmAddress)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment