diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala index 6ff2aa5244847457db2caf23d1ed5f7b25d6b685..36a2e2c6a6349c57dafa1610663c63734935ddce 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala @@ -18,12 +18,13 @@ package org.apache.spark.deploy.master import java.io._ -import java.nio.ByteBuffer + +import scala.reflect.ClassTag + +import akka.serialization.Serialization import org.apache.spark.Logging -import org.apache.spark.serializer.Serializer -import scala.reflect.ClassTag /** * Stores data in a single on-disk directory with one file per application and worker. @@ -34,10 +35,9 @@ import scala.reflect.ClassTag */ private[spark] class FileSystemPersistenceEngine( val dir: String, - val serialization: Serializer) + val serialization: Serialization) extends PersistenceEngine with Logging { - val serializer = serialization.newInstance() new File(dir).mkdir() override def persist(name: String, obj: Object): Unit = { @@ -56,17 +56,17 @@ private[spark] class FileSystemPersistenceEngine( private def serializeIntoFile(file: File, value: AnyRef) { val created = file.createNewFile() if (!created) { throw new IllegalStateException("Could not create file: " + file) } - - val out = serializer.serializeStream(new FileOutputStream(file)) + val serializer = serialization.findSerializerFor(value) + val serialized = serializer.toBinary(value) + val out = new FileOutputStream(file) try { - out.writeObject(value) + out.write(serialized) } finally { out.close() } - } - def deserializeFromFile[T](file: File): T = { + private def deserializeFromFile[T](file: File)(implicit m: ClassTag[T]): T = { val fileData = new Array[Byte](file.length().asInstanceOf[Int]) val dis = new DataInputStream(new FileInputStream(file)) try { @@ -74,7 +74,9 @@ private[spark] class FileSystemPersistenceEngine( } finally { dis.close() } - - serializer.deserializeStream(dis).readObject() + val clazz = m.runtimeClass.asInstanceOf[Class[T]] + val serializer = serialization.serializerFor(clazz) + serializer.fromBinary(fileData).asInstanceOf[T] } + } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 021454e25804ce2a8181b62f8827cd614c399115..7b32c505def9b4f57f9a6a3bae29079a69b796a4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -30,6 +30,7 @@ import scala.util.Random import akka.actor._ import akka.pattern.ask import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} +import akka.serialization.Serialization import akka.serialization.SerializationExtension import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} @@ -132,15 +133,18 @@ private[spark] class Master( val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match { case "ZOOKEEPER" => logInfo("Persisting recovery state to ZooKeeper") - val zkFactory = new ZooKeeperRecoveryModeFactory(conf) + val zkFactory = + new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(context.system)) (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this)) case "FILESYSTEM" => - val fsFactory = new FileSystemRecoveryModeFactory(conf) + val fsFactory = + new FileSystemRecoveryModeFactory(conf, SerializationExtension(context.system)) (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this)) case "CUSTOM" => val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory")) - val factory = clazz.getConstructor(conf.getClass) - .newInstance(conf).asInstanceOf[StandaloneRecoveryModeFactory] + val factory = clazz.getConstructor(conf.getClass, Serialization.getClass) + .newInstance(conf, SerializationExtension(context.system)) + .asInstanceOf[StandaloneRecoveryModeFactory] (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this)) case _ => (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this)) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala index d9d36c1ed5f9f8f13eca88a82c3c2088c86581d4..1096eb036835706050916e1ca358e812800a96ae 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala @@ -17,9 +17,10 @@ package org.apache.spark.deploy.master +import akka.serialization.Serialization + import org.apache.spark.{Logging, SparkConf} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.serializer.JavaSerializer /** * ::DeveloperApi:: @@ -29,7 +30,7 @@ import org.apache.spark.serializer.JavaSerializer * */ @DeveloperApi -abstract class StandaloneRecoveryModeFactory(conf: SparkConf) { +abstract class StandaloneRecoveryModeFactory(conf: SparkConf, serializer: Serialization) { /** * PersistenceEngine defines how the persistent data(Information about worker, driver etc..) @@ -48,21 +49,21 @@ abstract class StandaloneRecoveryModeFactory(conf: SparkConf) { * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual * recovery is made by restoring from filesystem. */ -private[spark] class FileSystemRecoveryModeFactory(conf: SparkConf) - extends StandaloneRecoveryModeFactory(conf) with Logging { +private[spark] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: Serialization) + extends StandaloneRecoveryModeFactory(conf, serializer) with Logging { val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "") def createPersistenceEngine() = { logInfo("Persisting recovery state to directory: " + RECOVERY_DIR) - new FileSystemPersistenceEngine(RECOVERY_DIR, new JavaSerializer(conf)) + new FileSystemPersistenceEngine(RECOVERY_DIR, serializer) } def createLeaderElectionAgent(master: LeaderElectable) = new MonarchyLeaderAgent(master) } -private[spark] class ZooKeeperRecoveryModeFactory(conf: SparkConf) - extends StandaloneRecoveryModeFactory(conf) { - def createPersistenceEngine() = new ZooKeeperPersistenceEngine(new JavaSerializer(conf), conf) +private[spark] class ZooKeeperRecoveryModeFactory(conf: SparkConf, serializer: Serialization) + extends StandaloneRecoveryModeFactory(conf, serializer) { + def createPersistenceEngine() = new ZooKeeperPersistenceEngine(conf, serializer) def createLeaderElectionAgent(master: LeaderElectable) = new ZooKeeperLeaderElectionAgent(master, conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala index 96c2139eb02f036553aad5c2fdfe5aac9717728f..e11ac031fb9c6ccf0b84611944b63be27d57550a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala @@ -17,27 +17,24 @@ package org.apache.spark.deploy.master +import akka.serialization.Serialization + import scala.collection.JavaConversions._ +import scala.reflect.ClassTag import org.apache.curator.framework.CuratorFramework import org.apache.zookeeper.CreateMode import org.apache.spark.{Logging, SparkConf} -import org.apache.spark.serializer.Serializer -import java.nio.ByteBuffer -import scala.reflect.ClassTag - -private[spark] class ZooKeeperPersistenceEngine(val serialization: Serializer, conf: SparkConf) +private[spark] class ZooKeeperPersistenceEngine(conf: SparkConf, val serialization: Serialization) extends PersistenceEngine with Logging { val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status" val zk: CuratorFramework = SparkCuratorUtil.newClient(conf) - val serializer = serialization.newInstance() - SparkCuratorUtil.mkdir(zk, WORKING_DIR) @@ -59,14 +56,17 @@ private[spark] class ZooKeeperPersistenceEngine(val serialization: Serializer, c } private def serializeIntoFile(path: String, value: AnyRef) { - val serialized = serializer.serialize(value) - zk.create().withMode(CreateMode.PERSISTENT).forPath(path, serialized.array()) + val serializer = serialization.findSerializerFor(value) + val serialized = serializer.toBinary(value) + zk.create().withMode(CreateMode.PERSISTENT).forPath(path, serialized) } - def deserializeFromFile[T](filename: String): Option[T] = { + def deserializeFromFile[T](filename: String)(implicit m: ClassTag[T]): Option[T] = { val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename) + val clazz = m.runtimeClass.asInstanceOf[Class[T]] + val serializer = serialization.serializerFor(clazz) try { - Some(serializer.deserialize(ByteBuffer.wrap(fileData))) + Some(serializer.fromBinary(fileData).asInstanceOf[T]) } catch { case e: Exception => { logWarning("Exception while reading persisted file, deleting", e)