From e4b8db45aef934929dbab443156375aebb1ea45e Mon Sep 17 00:00:00 2001
From: Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>
Date: Sat, 27 Nov 2010 00:53:57 -0800
Subject: [PATCH] - Moved DaemonThreadPool factory methods to the Broadcast
 Object. - Reflection-based broadcast class loading is still not working.

---
 src/scala/spark/Broadcast.scala        | 32 +++++++++++++++++++++
 src/scala/spark/ChainedBroadcast.scala | 40 ++------------------------
 src/scala/spark/SparkContext.scala     | 14 +++++++++
 3 files changed, 49 insertions(+), 37 deletions(-)

diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala
index 05f214e48e..afff500bb0 100644
--- a/src/scala/spark/Broadcast.scala
+++ b/src/scala/spark/Broadcast.scala
@@ -1,6 +1,7 @@
 package spark
 
 import java.util.UUID
+import java.util.concurrent.{Executors, ThreadPoolExecutor, ThreadFactory}
 
 @serializable
 trait Broadcast {
@@ -31,6 +32,37 @@ extends Logging {
       }
     }
   }
+  
+  // Returns a standard ThreadFactory except all threads are daemons
+  private def newDaemonThreadFactory: ThreadFactory = {
+    new ThreadFactory {
+      def newThread(r: Runnable): Thread = {
+        var t = Executors.defaultThreadFactory.newThread (r)
+        t.setDaemon (true)
+        return t
+      }
+    }  
+  }
+  
+  // Wrapper over newCachedThreadPool
+  def newDaemonCachedThreadPool: ThreadPoolExecutor = {
+    var threadPool = 
+      Executors.newCachedThreadPool.asInstanceOf[ThreadPoolExecutor]
+  
+    threadPool.setThreadFactory (newDaemonThreadFactory)
+    
+    return threadPool
+  }
+  
+  // Wrapper over newFixedThreadPool
+  def newDaemonFixedThreadPool (nThreads: Int): ThreadPoolExecutor = {
+    var threadPool = 
+      Executors.newFixedThreadPool (nThreads).asInstanceOf[ThreadPoolExecutor]
+  
+    threadPool.setThreadFactory (newDaemonThreadFactory)
+    
+    return threadPool
+  }  
 }
 
 @serializable
diff --git a/src/scala/spark/ChainedBroadcast.scala b/src/scala/spark/ChainedBroadcast.scala
index 6b34843abe..32f97ce442 100644
--- a/src/scala/spark/ChainedBroadcast.scala
+++ b/src/scala/spark/ChainedBroadcast.scala
@@ -6,8 +6,6 @@ import java.util.{Comparator, PriorityQueue, Random, UUID}
 
 import com.google.common.collect.MapMaker
 
-import java.util.concurrent.{Executors, ThreadPoolExecutor, ThreadFactory}
-
 import scala.collection.mutable.{Map, Set}
 
 @serializable
@@ -397,7 +395,7 @@ extends Broadcast with Logging {
     private var setOfCompletedSources = Set[SourceInfo] ()
   
     override def run: Unit = {
-      var threadPool = ChainedBroadcast.newDaemonCachedThreadPool
+      var threadPool = Broadcast.newDaemonCachedThreadPool
       var serverSocket: ServerSocket = null
 
       serverSocket = new ServerSocket (0)
@@ -602,7 +600,7 @@ extends Broadcast with Logging {
   class ServeMultipleRequests
   extends Thread with Logging {
     override def run: Unit = {
-      var threadPool = ChainedBroadcast.newDaemonCachedThreadPool
+      var threadPool = Broadcast.newDaemonCachedThreadPool
       var serverSocket: ServerSocket = null
 
       serverSocket = new ServerSocket (0) 
@@ -671,7 +669,6 @@ extends Broadcast with Logging {
             sendObject
           }
         } catch {
-          // TODO: Need to add better exception handling here
           // If something went wrong, e.g., the worker at the other end died etc. 
           // then close everything up
           case e: Exception => { 
@@ -802,41 +799,10 @@ extends Logging {
     }
   }
   
-  // Returns a standard ThreadFactory except all threads are daemons
-  private def newDaemonThreadFactory: ThreadFactory = {
-    new ThreadFactory {
-      def newThread(r: Runnable): Thread = {
-        var t = Executors.defaultThreadFactory.newThread (r)
-        t.setDaemon (true)
-        return t
-      }
-    }  
-  }
-  
-  // Wrapper over newCachedThreadPool
-  def newDaemonCachedThreadPool: ThreadPoolExecutor = {
-    var threadPool = 
-      Executors.newCachedThreadPool.asInstanceOf[ThreadPoolExecutor]
-  
-    threadPool.setThreadFactory (newDaemonThreadFactory)
-    
-    return threadPool
-  }
-  
-  // Wrapper over newFixedThreadPool
-  def newDaemonFixedThreadPool (nThreads: Int): ThreadPoolExecutor = {
-    var threadPool = 
-      Executors.newFixedThreadPool (nThreads).asInstanceOf[ThreadPoolExecutor]
-  
-    threadPool.setThreadFactory (newDaemonThreadFactory)
-    
-    return threadPool
-  }
-
   class TrackMultipleValues
   extends Thread with Logging {
     override def run: Unit = {
-      var threadPool = ChainedBroadcast.newDaemonCachedThreadPool
+      var threadPool = Broadcast.newDaemonCachedThreadPool
       var serverSocket: ServerSocket = null
       
       serverSocket = new ServerSocket (ChainedBroadcast.MasterTrackerPort)
diff --git a/src/scala/spark/SparkContext.scala b/src/scala/spark/SparkContext.scala
index 98149dc1b7..ef328d821a 100644
--- a/src/scala/spark/SparkContext.scala
+++ b/src/scala/spark/SparkContext.scala
@@ -22,6 +22,20 @@ class SparkContext(master: String, frameworkName: String) extends Logging {
   // def broadcast[T](value: T) = new DfsBroadcast(value, local)
   def broadcast[T](value: T) = new ChainedBroadcast(value, local)
 
+//  def broadcast[T](value: T) = { 
+//    val broadcastClass = System.getProperty("spark.broadcast.Class",
+//      "spark.ChainedBroadcast")
+//    val booleanArgs = Array[AnyRef] (local.asInstanceOf[AnyRef])
+//    Class.forName(broadcastClass).getConstructors()(0).newInstance(booleanArgs:_*).asInstanceOf[Class.forName(broadcastClass)]
+//  }
+
+// def initialize() {
+//    val cacheClass = System.getProperty("spark.cache.class",
+//      "spark.SoftReferenceCache")
+//    instance = Class.forName(cacheClass).newInstance().asInstanceOf[Cache]
+//  }
+
+
   def textFile(path: String) = new HdfsTextFile(this, path)
 
   val LOCAL_REGEX = """local\[([0-9]+)\]""".r
-- 
GitLab