Skip to content
Snippets Groups Projects
Commit e4b8db45 authored by Mosharaf Chowdhury's avatar Mosharaf Chowdhury
Browse files

- Moved DaemonThreadPool factory methods to the Broadcast Object.

- Reflection-based broadcast class loading is still not working.
parent c2591422
No related branches found
No related tags found
No related merge requests found
package spark package spark
import java.util.UUID import java.util.UUID
import java.util.concurrent.{Executors, ThreadPoolExecutor, ThreadFactory}
@serializable @serializable
trait Broadcast { trait Broadcast {
...@@ -31,6 +32,37 @@ extends Logging { ...@@ -31,6 +32,37 @@ extends Logging {
} }
} }
} }
// Returns a standard ThreadFactory except all threads are daemons
private def newDaemonThreadFactory: ThreadFactory = {
new ThreadFactory {
def newThread(r: Runnable): Thread = {
var t = Executors.defaultThreadFactory.newThread (r)
t.setDaemon (true)
return t
}
}
}
// Wrapper over newCachedThreadPool
def newDaemonCachedThreadPool: ThreadPoolExecutor = {
var threadPool =
Executors.newCachedThreadPool.asInstanceOf[ThreadPoolExecutor]
threadPool.setThreadFactory (newDaemonThreadFactory)
return threadPool
}
// Wrapper over newFixedThreadPool
def newDaemonFixedThreadPool (nThreads: Int): ThreadPoolExecutor = {
var threadPool =
Executors.newFixedThreadPool (nThreads).asInstanceOf[ThreadPoolExecutor]
threadPool.setThreadFactory (newDaemonThreadFactory)
return threadPool
}
} }
@serializable @serializable
......
...@@ -6,8 +6,6 @@ import java.util.{Comparator, PriorityQueue, Random, UUID} ...@@ -6,8 +6,6 @@ import java.util.{Comparator, PriorityQueue, Random, UUID}
import com.google.common.collect.MapMaker import com.google.common.collect.MapMaker
import java.util.concurrent.{Executors, ThreadPoolExecutor, ThreadFactory}
import scala.collection.mutable.{Map, Set} import scala.collection.mutable.{Map, Set}
@serializable @serializable
...@@ -397,7 +395,7 @@ extends Broadcast with Logging { ...@@ -397,7 +395,7 @@ extends Broadcast with Logging {
private var setOfCompletedSources = Set[SourceInfo] () private var setOfCompletedSources = Set[SourceInfo] ()
override def run: Unit = { override def run: Unit = {
var threadPool = ChainedBroadcast.newDaemonCachedThreadPool var threadPool = Broadcast.newDaemonCachedThreadPool
var serverSocket: ServerSocket = null var serverSocket: ServerSocket = null
serverSocket = new ServerSocket (0) serverSocket = new ServerSocket (0)
...@@ -602,7 +600,7 @@ extends Broadcast with Logging { ...@@ -602,7 +600,7 @@ extends Broadcast with Logging {
class ServeMultipleRequests class ServeMultipleRequests
extends Thread with Logging { extends Thread with Logging {
override def run: Unit = { override def run: Unit = {
var threadPool = ChainedBroadcast.newDaemonCachedThreadPool var threadPool = Broadcast.newDaemonCachedThreadPool
var serverSocket: ServerSocket = null var serverSocket: ServerSocket = null
serverSocket = new ServerSocket (0) serverSocket = new ServerSocket (0)
...@@ -671,7 +669,6 @@ extends Broadcast with Logging { ...@@ -671,7 +669,6 @@ extends Broadcast with Logging {
sendObject sendObject
} }
} catch { } catch {
// TODO: Need to add better exception handling here
// If something went wrong, e.g., the worker at the other end died etc. // If something went wrong, e.g., the worker at the other end died etc.
// then close everything up // then close everything up
case e: Exception => { case e: Exception => {
...@@ -802,41 +799,10 @@ extends Logging { ...@@ -802,41 +799,10 @@ extends Logging {
} }
} }
// Returns a standard ThreadFactory except all threads are daemons
private def newDaemonThreadFactory: ThreadFactory = {
new ThreadFactory {
def newThread(r: Runnable): Thread = {
var t = Executors.defaultThreadFactory.newThread (r)
t.setDaemon (true)
return t
}
}
}
// Wrapper over newCachedThreadPool
def newDaemonCachedThreadPool: ThreadPoolExecutor = {
var threadPool =
Executors.newCachedThreadPool.asInstanceOf[ThreadPoolExecutor]
threadPool.setThreadFactory (newDaemonThreadFactory)
return threadPool
}
// Wrapper over newFixedThreadPool
def newDaemonFixedThreadPool (nThreads: Int): ThreadPoolExecutor = {
var threadPool =
Executors.newFixedThreadPool (nThreads).asInstanceOf[ThreadPoolExecutor]
threadPool.setThreadFactory (newDaemonThreadFactory)
return threadPool
}
class TrackMultipleValues class TrackMultipleValues
extends Thread with Logging { extends Thread with Logging {
override def run: Unit = { override def run: Unit = {
var threadPool = ChainedBroadcast.newDaemonCachedThreadPool var threadPool = Broadcast.newDaemonCachedThreadPool
var serverSocket: ServerSocket = null var serverSocket: ServerSocket = null
serverSocket = new ServerSocket (ChainedBroadcast.MasterTrackerPort) serverSocket = new ServerSocket (ChainedBroadcast.MasterTrackerPort)
......
...@@ -22,6 +22,20 @@ class SparkContext(master: String, frameworkName: String) extends Logging { ...@@ -22,6 +22,20 @@ class SparkContext(master: String, frameworkName: String) extends Logging {
// def broadcast[T](value: T) = new DfsBroadcast(value, local) // def broadcast[T](value: T) = new DfsBroadcast(value, local)
def broadcast[T](value: T) = new ChainedBroadcast(value, local) def broadcast[T](value: T) = new ChainedBroadcast(value, local)
// def broadcast[T](value: T) = {
// val broadcastClass = System.getProperty("spark.broadcast.Class",
// "spark.ChainedBroadcast")
// val booleanArgs = Array[AnyRef] (local.asInstanceOf[AnyRef])
// Class.forName(broadcastClass).getConstructors()(0).newInstance(booleanArgs:_*).asInstanceOf[Class.forName(broadcastClass)]
// }
// def initialize() {
// val cacheClass = System.getProperty("spark.cache.class",
// "spark.SoftReferenceCache")
// instance = Class.forName(cacheClass).newInstance().asInstanceOf[Cache]
// }
def textFile(path: String) = new HdfsTextFile(this, path) def textFile(path: String) = new HdfsTextFile(this, path)
val LOCAL_REGEX = """local\[([0-9]+)\]""".r val LOCAL_REGEX = """local\[([0-9]+)\]""".r
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment