diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 278e018e8f2b66c955fbd8887af74feb3bd6362a..43414d2e412a24781a831134d8386b450111ef06 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -74,7 +74,6 @@ class SparkContext( true, isLocal) SparkEnv.set(env) - Broadcast.initialize(true) // Create and start the scheduler private var taskScheduler: TaskScheduler = { @@ -295,14 +294,14 @@ class SparkContext( // Keep around a weak hash map of values to Cached versions? - def broadcast[T](value: T) = Broadcast.getBroadcastFactory.newBroadcast[T] (value, isLocal) + def broadcast[T](value: T) = SparkEnv.get.broadcastManager.newBroadcast[T] (value, isLocal) // Stop the SparkContext def stop() { dagScheduler.stop() dagScheduler = null taskScheduler = null - // TODO: Broadcast.stop(), Cache.stop()? + // TODO: Cache.stop()? env.stop() SparkEnv.set(null) ShuffleMapTask.clearCache() diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index 694db6b2a30667b5758d32d5ffd68835096cdc67..add8fcec51e65e174b0dc28b5b3e9357fa85b7d2 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -2,6 +2,7 @@ package spark import akka.actor.ActorSystem +import spark.broadcast.BroadcastManager import spark.storage.BlockManager import spark.storage.BlockManagerMaster import spark.network.ConnectionManager @@ -16,13 +17,14 @@ class SparkEnv ( val mapOutputTracker: MapOutputTracker, val shuffleFetcher: ShuffleFetcher, val shuffleManager: ShuffleManager, + val broadcastManager: BroadcastManager, val blockManager: BlockManager, val connectionManager: ConnectionManager ) { /** No-parameter constructor for unit tests. */ def this() = { - this(null, null, new JavaSerializer, new JavaSerializer, null, null, null, null, null, null) + this(null, null, new JavaSerializer, new JavaSerializer, null, null, null, null, null, null, null) } def stop() { @@ -30,6 +32,7 @@ class SparkEnv ( cacheTracker.stop() shuffleFetcher.stop() shuffleManager.stop() + broadcastManager.stop() blockManager.stop() blockManager.master.stop() actorSystem.shutdown() @@ -74,6 +77,8 @@ object SparkEnv { val shuffleManager = new ShuffleManager() + val broadcastManager = new BroadcastManager(isMaster) + val closureSerializerClass = System.getProperty("spark.closure.serializer", "spark.JavaSerializer") val closureSerializer = @@ -119,6 +124,7 @@ object SparkEnv { mapOutputTracker, shuffleFetcher, shuffleManager, + broadcastManager, blockManager, connectionManager) } diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala index 07f28999c8dee3c42db5f81d7b0c4da90c521320..473d080044349a5e9ef6342b64efdd5afb39b79e 100644 --- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala @@ -16,8 +16,8 @@ extends Broadcast[T] with Logging with Serializable { def value = value_ - Broadcast.synchronized { - Broadcast.values.putSingle(uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) + MultiTracker.synchronized { + SparkEnv.get.blockManager.putSingle(uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) } @transient var arrayOfBlocks: Array[BroadcastBlock] = null @@ -111,10 +111,11 @@ extends Broadcast[T] with Logging with Serializable { private def readObject(in: ObjectInputStream) { in.defaultReadObject() - Broadcast.synchronized { + MultiTracker.synchronized { SparkEnv.get.blockManager.getSingle(uuid.toString) match { case Some(x) => x.asInstanceOf[T] case None => { + logInfo("Started reading broadcast variable " + uuid) // Initializing everything because Master will only send null/0 values // Only the 1st worker in a node can be here. Others will get from cache initializeWorkerVariables @@ -132,7 +133,7 @@ extends Broadcast[T] with Logging with Serializable { val receptionSucceeded = receiveBroadcast(uuid) if (receptionSucceeded) { value_ = MultiTracker.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks) - Broadcast.values.putSingle(uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) + SparkEnv.get.blockManager.putSingle(uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) } else { logError("Reading Broadcasted variable " + uuid + " failed") } @@ -241,7 +242,7 @@ extends Broadcast[T] with Logging with Serializable { // Receive source information from Guide var suitableSources = oisGuide.readObject.asInstanceOf[ListBuffer[SourceInfo]] - logInfo("Received suitableSources from Master " + suitableSources) + logDebug("Received suitableSources from Master " + suitableSources) addToListOfSources(suitableSources) @@ -312,9 +313,9 @@ extends Broadcast[T] with Logging with Serializable { var peerToTalkTo = pickPeerToTalkToRandom if (peerToTalkTo != null) - logInfo("Peer chosen: " + peerToTalkTo + " with " + peerToTalkTo.hasBlocksBitVector) + logDebug("Peer chosen: " + peerToTalkTo + " with " + peerToTalkTo.hasBlocksBitVector) else - logInfo("No peer chosen...") + logDebug("No peer chosen...") if (peerToTalkTo != null) { threadPool.execute(new TalkToPeer(peerToTalkTo)) @@ -340,7 +341,7 @@ extends Broadcast[T] with Logging with Serializable { var curPeer: SourceInfo = null var curMax = 0 - logInfo("Picking peers to talk to...") + logDebug("Picking peers to talk to...") // Find peers that are not connected right now var peersNotInUse = ListBuffer[SourceInfo]() @@ -529,7 +530,7 @@ extends Broadcast[T] with Logging with Serializable { val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock] val receptionTime = (System.currentTimeMillis - recvStartTime) - logInfo("Received block: " + bcBlock.blockID + " from " + peerToTalkTo + " in " + receptionTime + " millis.") + logDebug("Received block: " + bcBlock.blockID + " from " + peerToTalkTo + " in " + receptionTime + " millis.") if (!hasBlocksBitVector.get(bcBlock.blockID)) { arrayOfBlocks(bcBlock.blockID) = bcBlock @@ -560,7 +561,7 @@ extends Broadcast[T] with Logging with Serializable { // connection due to timeout case eofe: java.io.EOFException => { } case e: Exception => { - logInfo("TalktoPeer had a " + e) + logError("TalktoPeer had a " + e) // FIXME: Remove 'newPeerToTalkTo' from listOfSources // We probably should have the following in some form, but not // really here. This exception can happen if the sender just breaks connection @@ -727,7 +728,7 @@ extends Broadcast[T] with Logging with Serializable { clientSocket = serverSocket.accept() } catch { case e: Exception => { - logInfo("GuideMultipleRequests Timeout.") + logError("GuideMultipleRequests Timeout.") // Stop broadcast if at least one worker has connected and // everyone connected so far are done. Comparing with @@ -739,7 +740,7 @@ extends Broadcast[T] with Logging with Serializable { } } if (clientSocket != null) { - logInfo("Guide: Accepted new client connection:" + clientSocket) + logDebug("Guide: Accepted new client connection:" + clientSocket) try { threadPool.execute(new GuideSingleRequest(clientSocket)) } catch { @@ -789,7 +790,7 @@ extends Broadcast[T] with Logging with Serializable { gosSource.flush() } catch { case e: Exception => { - logInfo("sendStopBroadcastNotifications had a " + e) + logError("sendStopBroadcastNotifications had a " + e) } } finally { if (gisSource != null) { @@ -823,7 +824,7 @@ extends Broadcast[T] with Logging with Serializable { // Select a suitable source and send it back to the worker selectedSources = selectSuitableSources(sourceInfo) - logInfo("Sending selectedSources:" + selectedSources) + logDebug("Sending selectedSources:" + selectedSources) oos.writeObject(selectedSources) oos.flush() @@ -837,6 +838,7 @@ extends Broadcast[T] with Logging with Serializable { } } } finally { + logInfo("GuideSingleRequest is closing streams and sockets") ois.close() oos.close() clientSocket.close() @@ -915,11 +917,11 @@ extends Broadcast[T] with Logging with Serializable { clientSocket = serverSocket.accept() } catch { case e: Exception => { - logInfo("ServeMultipleRequests Timeout.") + logError("ServeMultipleRequests Timeout.") } } if (clientSocket != null) { - logInfo("Serve: Accepted new client connection:" + clientSocket) + logDebug("Serve: Accepted new client connection:" + clientSocket) try { threadPool.execute(new ServeSingleRequest(clientSocket)) } catch { @@ -986,7 +988,7 @@ extends Broadcast[T] with Logging with Serializable { // Receive latest SourceInfo from the receiver rxSourceInfo = ois.readObject.asInstanceOf[SourceInfo] - // logInfo("rxSourceInfo: " + rxSourceInfo + " with " + rxSourceInfo.hasBlocksBitVector) + logDebug("rxSourceInfo: " + rxSourceInfo + " with " + rxSourceInfo.hasBlocksBitVector) addToListOfSources(rxSourceInfo) curTime = System.currentTimeMillis @@ -997,7 +999,7 @@ extends Broadcast[T] with Logging with Serializable { } } } catch { - case e: Exception => logInfo("ServeSingleRequest had a " + e) + case e: Exception => logError("ServeSingleRequest had a " + e) } finally { logInfo("ServeSingleRequest is closing streams and sockets") ois.close() @@ -1011,9 +1013,9 @@ extends Broadcast[T] with Logging with Serializable { oos.writeObject(arrayOfBlocks(blockToSend)) oos.flush() } catch { - case e: Exception => logInfo("sendBlock had a " + e) + case e: Exception => logError("sendBlock had a " + e) } - logInfo("Sent block: " + blockToSend + " to " + clientSocket) + logDebug("Sent block: " + blockToSend + " to " + clientSocket) } } } diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala index 8567f1a4a60cb0c6bdff747aaa67dac74a7fdf1d..d68e56a11450df5544f5dc3ef51a34a5823b2582 100644 --- a/core/src/main/scala/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/spark/broadcast/Broadcast.scala @@ -20,17 +20,15 @@ trait Broadcast[T] extends Serializable { override def toString = "spark.Broadcast(" + uuid + ")" } -object Broadcast extends Logging with Serializable { +class BroadcastManager(val isMaster_ : Boolean) extends Logging with Serializable { private var initialized = false - private var isMaster_ = false private var broadcastFactory: BroadcastFactory = null - // Cache of broadcasted objects - val values = SparkEnv.get.blockManager + initialize() // Called by SparkContext or Executor before using Broadcast - def initialize(isMaster__ : Boolean) { + private def initialize() { synchronized { if (!initialized) { val broadcastFactoryClass = System.getProperty( @@ -39,14 +37,6 @@ object Broadcast extends Logging with Serializable { broadcastFactory = Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory] - // Setup isMaster before using it - isMaster_ = isMaster__ - - // Set masterHostAddress to the master's IP address for the slaves to read - if (isMaster) { - System.setProperty("spark.broadcast.masterHostAddress", Utils.localIpAddress) - } - // Initialize appropriate BroadcastFactory and BroadcastObject broadcastFactory.initialize(isMaster) @@ -59,17 +49,14 @@ object Broadcast extends Logging with Serializable { broadcastFactory.stop() } - def getBroadcastFactory: BroadcastFactory = { + private def getBroadcastFactory: BroadcastFactory = { if (broadcastFactory == null) { throw new SparkException ("Broadcast.getBroadcastFactory called before initialize") } broadcastFactory } - private var MasterHostAddress_ = System.getProperty( - "spark.broadcast.masterHostAddress", "") + def newBroadcast[T](value_ : T, isLocal: Boolean) = broadcastFactory.newBroadcast[T](value_, isLocal) def isMaster = isMaster_ - - def MasterHostAddress = MasterHostAddress_ } diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala index b7b5e0b3140496b6ee718e7c4031a2abc6c03fff..e4b135644814b0dd38cb282ca3c261ba1e81f98a 100644 --- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala @@ -17,8 +17,8 @@ extends Broadcast[T] with Logging with Serializable { def value = value_ - Broadcast.synchronized { - Broadcast.values.putSingle(uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) + HttpBroadcast.synchronized { + SparkEnv.get.blockManager.putSingle(uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) } if (!isLocal) { @@ -35,7 +35,7 @@ extends Broadcast[T] with Logging with Serializable { logInfo("Started reading broadcast variable " + uuid) val start = System.nanoTime value_ = HttpBroadcast.read[T](uuid) - Broadcast.values.putSingle(uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) + SparkEnv.get.blockManager.putSingle(uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) val time = (System.nanoTime - start) / 1e9 logInfo("Reading broadcast variable " + uuid + " took " + time + " s") } diff --git a/core/src/main/scala/spark/broadcast/MultiTracker.scala b/core/src/main/scala/spark/broadcast/MultiTracker.scala index 10b90526e8efac73108696bd2e658b95c8a7cb63..d5f5b22461d542f6631efbf2ed0ac6b5db92e77a 100644 --- a/core/src/main/scala/spark/broadcast/MultiTracker.scala +++ b/core/src/main/scala/spark/broadcast/MultiTracker.scala @@ -40,6 +40,9 @@ extends Logging { trackMV = new TrackMultipleValues trackMV.setDaemon(true) trackMV.start() + + // Set masterHostAddress to the master's IP address for the slaves to read + System.setProperty("spark.MultiTracker.MasterHostAddress", Utils.localIpAddress) } initialized = true @@ -52,6 +55,8 @@ extends Logging { } // Load common parameters + private var MasterHostAddress_ = System.getProperty( + "spark.MultiTracker.MasterHostAddress", "") private var MasterTrackerPort_ = System.getProperty( "spark.broadcast.masterTrackerPort", "11111").toInt private var BlockSize_ = System.getProperty( @@ -90,6 +95,7 @@ extends Logging { def isMaster = isMaster_ // Common config params + def MasterHostAddress = MasterHostAddress_ def MasterTrackerPort = MasterTrackerPort_ def BlockSize = BlockSize_ def MaxRetryCount = MaxRetryCount_ @@ -119,7 +125,7 @@ extends Logging { var serverSocket: ServerSocket = null serverSocket = new ServerSocket(MasterTrackerPort) - logInfo("TrackMultipleValues" + serverSocket) + logInfo("TrackMultipleValues started at " + serverSocket) try { while (!stopBroadcast) { @@ -158,7 +164,7 @@ extends Logging { valueToGuideMap += (uuid -> gInfo) } - logInfo ("New broadcast registered with TrackMultipleValues " + uuid + " " + valueToGuideMap) + logInfo ("New broadcast " + uuid + " registered with TrackMultipleValues. Ongoing ones: " + valueToGuideMap) // Send dummy ACK oos.writeObject(-1) @@ -170,10 +176,9 @@ extends Logging { // Remove from the map valueToGuideMap.synchronized { valueToGuideMap(uuid) = SourceInfo("", SourceInfo.TxOverGoToDefault) - logInfo("Value unregistered from the Tracker " + valueToGuideMap) } - logInfo ("Broadcast unregistered from TrackMultipleValues " + uuid + " " + valueToGuideMap) + logInfo ("Broadcast " + uuid + " unregistered from TrackMultipleValues. Ongoing ones: " + valueToGuideMap) // Send dummy ACK oos.writeObject(-1) @@ -186,7 +191,7 @@ extends Logging { if (valueToGuideMap.contains(uuid)) valueToGuideMap(uuid) else SourceInfo("", SourceInfo.TxNotStartedRetry) - logInfo("TrackMultipleValues: Got new request: " + clientSocket + " for " + uuid + " : " + gInfo.listenPort) + logDebug("Got new request: " + clientSocket + " for " + uuid + " : " + gInfo.listenPort) // Send reply back oos.writeObject(gInfo) @@ -196,7 +201,7 @@ extends Logging { } } catch { case e: Exception => { - logInfo("TrackMultipleValues had a " + e) + logError("TrackMultipleValues had a " + e) } } finally { ois.close() @@ -231,7 +236,7 @@ extends Logging { try { // Connect to the tracker to find out GuideInfo clientSocketToTracker = - new Socket(Broadcast.MasterHostAddress, MultiTracker.MasterTrackerPort) + new Socket(MultiTracker.MasterHostAddress, MultiTracker.MasterTrackerPort) oosTracker = new ObjectOutputStream(clientSocketToTracker.getOutputStream) oosTracker.flush() @@ -247,7 +252,7 @@ extends Logging { oosTracker.flush() gInfo = oisTracker.readObject.asInstanceOf[SourceInfo] } catch { - case e: Exception => logInfo("getGuideInfo had a " + e) + case e: Exception => logError("getGuideInfo had a " + e) } finally { if (oisTracker != null) { oisTracker.close() @@ -267,12 +272,12 @@ extends Logging { retriesLeft -= 1 } while (retriesLeft > 0 && gInfo.listenPort == SourceInfo.TxNotStartedRetry) - logInfo("Got this guidePort from Tracker: " + gInfo.listenPort) + logDebug("Got this guidePort from Tracker: " + gInfo.listenPort) return gInfo } def registerBroadcast(uuid: UUID, gInfo: SourceInfo) { - val socket = new Socket(Broadcast.MasterHostAddress, MasterTrackerPort) + val socket = new Socket(MultiTracker.MasterHostAddress, MasterTrackerPort) val oosST = new ObjectOutputStream(socket.getOutputStream) oosST.flush() val oisST = new ObjectInputStream(socket.getInputStream) @@ -299,7 +304,7 @@ extends Logging { } def unregisterBroadcast(uuid: UUID) { - val socket = new Socket(Broadcast.MasterHostAddress, MasterTrackerPort) + val socket = new Socket(MultiTracker.MasterHostAddress, MasterTrackerPort) val oosST = new ObjectOutputStream(socket.getOutputStream) oosST.flush() val oisST = new ObjectInputStream(socket.getInputStream) diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala index 0e3d4d927a4b95764d0a18ba4fea15e7527acf54..692825353778d03d1695b974d2ec0ba5e4a8cb3b 100644 --- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala @@ -15,8 +15,8 @@ extends Broadcast[T] with Logging with Serializable { def value = value_ - Broadcast.synchronized { - Broadcast.values.putSingle(uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) + MultiTracker.synchronized { + SparkEnv.get.blockManager.putSingle(uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) } @transient var arrayOfBlocks: Array[BroadcastBlock] = null @@ -89,10 +89,11 @@ extends Broadcast[T] with Logging with Serializable { private def readObject(in: ObjectInputStream) { in.defaultReadObject() - Broadcast.synchronized { + MultiTracker.synchronized { SparkEnv.get.blockManager.getSingle(uuid.toString) match { case Some(x) => x.asInstanceOf[T] case None => { + logInfo("Started reading broadcast variable " + uuid) // Initializing everything because Master will only send null/0 values // Only the 1st worker in a node can be here. Others will get from cache initializeWorkerVariables @@ -109,7 +110,7 @@ extends Broadcast[T] with Logging with Serializable { val receptionSucceeded = receiveBroadcast(uuid) if (receptionSucceeded) { value_ = MultiTracker.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks) - Broadcast.values.putSingle(uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) + SparkEnv.get.blockManager.putSingle(uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) } else { logError("Reading Broadcasted variable " + uuid + " failed") } @@ -161,12 +162,12 @@ extends Broadcast[T] with Logging with Serializable { var retriesLeft = MultiTracker.MaxRetryCount do { // Connect to Master and send this worker's Information - clientSocketToMaster = new Socket(Broadcast.MasterHostAddress, gInfo.listenPort) + clientSocketToMaster = new Socket(MultiTracker.MasterHostAddress, gInfo.listenPort) oosMaster = new ObjectOutputStream(clientSocketToMaster.getOutputStream) oosMaster.flush() oisMaster = new ObjectInputStream(clientSocketToMaster.getInputStream) - logInfo("Connected to Master's guiding object") + logDebug("Connected to Master's guiding object") // Send local source information oosMaster.writeObject(SourceInfo(hostAddress, listenPort)) @@ -179,7 +180,7 @@ extends Broadcast[T] with Logging with Serializable { totalBlocksLock.synchronized { totalBlocksLock.notifyAll() } totalBytes = sourceInfo.totalBytes - logInfo("Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort) + logDebug("Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort) val start = System.nanoTime val receptionSucceeded = receiveSingleTransmission(sourceInfo) @@ -226,8 +227,8 @@ extends Broadcast[T] with Logging with Serializable { oosSource.flush() oisSource = new ObjectInputStream(clientSocketToSource.getInputStream) - logInfo("Inside receiveSingleTransmission") - logInfo("totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks) + logDebug("Inside receiveSingleTransmission") + logDebug("totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks) // Send the range oosSource.writeObject((hasBlocks, totalBlocks)) @@ -238,7 +239,7 @@ extends Broadcast[T] with Logging with Serializable { val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock] val receptionTime = (System.currentTimeMillis - recvStartTime) - logInfo("Received block: " + bcBlock.blockID + " from " + sourceInfo + " in " + receptionTime + " millis.") + logDebug("Received block: " + bcBlock.blockID + " from " + sourceInfo + " in " + receptionTime + " millis.") arrayOfBlocks(hasBlocks) = bcBlock hasBlocks += 1 @@ -248,7 +249,7 @@ extends Broadcast[T] with Logging with Serializable { hasBlocksLock.synchronized { hasBlocksLock.notifyAll() } } } catch { - case e: Exception => logInfo("receiveSingleTransmission had a " + e) + case e: Exception => logError("receiveSingleTransmission had a " + e) } finally { if (oisSource != null) { oisSource.close() @@ -287,7 +288,7 @@ extends Broadcast[T] with Logging with Serializable { clientSocket = serverSocket.accept } catch { case e: Exception => { - logInfo("GuideMultipleRequests Timeout.") + logError("GuideMultipleRequests Timeout.") // Stop broadcast if at least one worker has connected and // everyone connected so far are done. @@ -300,7 +301,7 @@ extends Broadcast[T] with Logging with Serializable { } } if (clientSocket != null) { - logInfo("Guide: Accepted new client connection: " + clientSocket) + logDebug("Guide: Accepted new client connection: " + clientSocket) try { threadPool.execute(new GuideSingleRequest(clientSocket)) } catch { @@ -346,7 +347,7 @@ extends Broadcast[T] with Logging with Serializable { gosSource.flush() } catch { case e: Exception => { - logInfo("sendStopBroadcastNotifications had a " + e) + logError("sendStopBroadcastNotifications had a " + e) } } finally { if (gisSource != null) { @@ -382,14 +383,14 @@ extends Broadcast[T] with Logging with Serializable { listOfSources.synchronized { // Select a suitable source and send it back to the worker selectedSourceInfo = selectSuitableSource(sourceInfo) - logInfo("Sending selectedSourceInfo: " + selectedSourceInfo) + logDebug("Sending selectedSourceInfo: " + selectedSourceInfo) oos.writeObject(selectedSourceInfo) oos.flush() // Add this new (if it can finish) source to the list of sources thisWorkerInfo = SourceInfo(sourceInfo.hostAddress, sourceInfo.listenPort, totalBlocks, totalBytes) - logInfo("Adding possible new source to listOfSources: " + thisWorkerInfo) + logDebug("Adding possible new source to listOfSources: " + thisWorkerInfo) listOfSources += thisWorkerInfo } @@ -437,6 +438,7 @@ extends Broadcast[T] with Logging with Serializable { } } } finally { + logInfo("GuideSingleRequest is closing streams and sockets") ois.close() oos.close() clientSocket.close() @@ -486,11 +488,11 @@ extends Broadcast[T] with Logging with Serializable { serverSocket.setSoTimeout(MultiTracker.ServerSocketTimeout) clientSocket = serverSocket.accept } catch { - case e: Exception => logInfo("ServeMultipleRequests Timeout.") + case e: Exception => logError("ServeMultipleRequests Timeout.") } if (clientSocket != null) { - logInfo("Serve: Accepted new client connection: " + clientSocket) + logDebug("Serve: Accepted new client connection: " + clientSocket) try { threadPool.execute(new ServeSingleRequest(clientSocket)) } catch { @@ -534,7 +536,7 @@ extends Broadcast[T] with Logging with Serializable { sendObject } } catch { - case e: Exception => logInfo("ServeSingleRequest had a " + e) + case e: Exception => logError("ServeSingleRequest had a " + e) } finally { logInfo("ServeSingleRequest is closing streams and sockets") ois.close() @@ -557,9 +559,9 @@ extends Broadcast[T] with Logging with Serializable { oos.writeObject(arrayOfBlocks(i)) oos.flush() } catch { - case e: Exception => logInfo("sendObject had a " + e) + case e: Exception => logError("sendObject had a " + e) } - logInfo("Sent block: " + i + " to " + clientSocket) + logDebug("Sent block: " + i + " to " + clientSocket) } } } diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index e3958cec519521f43ce811bfa68f58f114c0c093..9e335c25f7527026ce55f12f361e702c1bef883b 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -35,8 +35,6 @@ class Executor extends Logging { // Initialize Spark environment (using system properties read above) env = SparkEnv.createFromSystemProperties(slaveHostname, 0, false, false) SparkEnv.set(env) - // Old stuff that isn't yet using env - Broadcast.initialize(false) // Create our ClassLoader (using spark properties) and set it on this thread classLoader = createClassLoader()