Skip to content
Snippets Groups Projects
Commit 9930a95d authored by shane-huang's avatar shane-huang
Browse files

Modified Patch according to comments

parent e4cb72da
No related branches found
No related tags found
No related merge requests found
......@@ -136,10 +136,10 @@ extends Connection(SocketChannel.open, selector_) {
if (chunk.isDefined) {
messages += message // this is probably incorrect, it wont work as fifo
if (!message.started) {
logDebug("Starting to send [" + message + "]")
message.started = true
message.startTime = System.currentTimeMillis
}
logDebug("Starting to send [" + message + "]")
message.started = true
message.startTime = System.currentTimeMillis
}
return chunk
} else {
/*logInfo("Finished sending [" + message + "] to [" + remoteConnectionManagerId + "]")*/
......
......@@ -43,12 +43,12 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
}
val selector = SelectorProvider.provider.openSelector()
val handleMessageExecutor = Executors.newFixedThreadPool(20)
val handleMessageExecutor = Executors.newFixedThreadPool(System.getProperty("spark.core.connection.handler.threads","20").toInt)
val serverChannel = ServerSocketChannel.open()
val connectionsByKey = new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection]
val connectionsById = new HashMap[ConnectionManagerId, SendingConnection] with SynchronizedMap[ConnectionManagerId, SendingConnection]
val messageStatuses = new HashMap[Int, MessageStatus]
val connectionRequests = new HashMap[ConnectionManagerId, SendingConnection] with SynchronizedMap[ConnectionManagerId, SendingConnection]
val connectionRequests = new HashMap[ConnectionManagerId, SendingConnection] with SynchronizedMap[ConnectionManagerId, SendingConnection]
val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
val sendMessageRequests = new Queue[(Message, SendingConnection)]
......@@ -78,9 +78,8 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
def run() {
try {
while(!selectorThread.isInterrupted) {
while(!selectorThread.isInterrupted) {
for( (connectionManagerId, sendingConnection) <- connectionRequests) {
//val sendingConnection = connectionRequests.dequeue
sendingConnection.connect()
addConnection(sendingConnection)
connectionRequests -= connectionManagerId
......@@ -465,7 +464,7 @@ private[spark] object ConnectionManager {
val bufferMessage = Message.createBufferMessage(buffer.duplicate)
manager.sendMessageReliably(manager.id, bufferMessage)
}).foreach(f => {
val g = Await.result(f, 10 second)
val g = Await.result(f, 1 second)
if (!g.isDefined) println("Failed")
})
val finishTime = System.currentTimeMillis
......
......@@ -13,8 +13,14 @@ import akka.util.duration._
private[spark] object ConnectionManagerTest extends Logging{
def main(args: Array[String]) {
if (args.length < 5) {
println("Usage: ConnectionManagerTest <mesos cluster> <slaves file> <num of tasks> <size of msg> <count>")
//<mesos cluster> - the master URL
//<slaves file> - a list slaves to run connectionTest on
//[num of tasks] - the number of parallel tasks to be initiated default is number of slave hosts
//[size of msg in MB (integer)] - the size of messages to be sent in each task, default is 10
//[count] - how many times to run, default is 3
//[await time in seconds] : await time (in seconds), default is 600
if (args.length < 2) {
println("Usage: ConnectionManagerTest <mesos cluster> <slaves file> [num of tasks] [size of msg in MB (integer)] [count] [await time in seconds)] ")
System.exit(1)
}
......@@ -29,14 +35,17 @@ private[spark] object ConnectionManagerTest extends Logging{
/*println("Slaves")*/
/*slaves.foreach(println)*/
val tasknum = args(2).toInt
val tasknum = if (args.length > 2) args(2).toInt else slaves.length
val size = ( if (args.length > 3) (args(3).toInt) else 10 ) * 1024 * 1024
val count = if (args.length > 4) args(4).toInt else 3
val awaitTime = (if (args.length > 5) args(5).toInt else 600 ).second
println("Running "+count+" rounds of test: " + "parallel tasks = " + tasknum + ", msg size = " + size/1024/1024 + " MB, awaitTime = " + awaitTime)
val slaveConnManagerIds = sc.parallelize(0 until tasknum, tasknum).map(
i => SparkEnv.get.connectionManager.id).collect()
println("\nSlave ConnectionManagerIds")
slaveConnManagerIds.foreach(println)
println
val count = args(4).toInt
(0 until count).foreach(i => {
val resultStrs = sc.parallelize(0 until tasknum, tasknum).map(i => {
val connManager = SparkEnv.get.connectionManager
......@@ -46,7 +55,6 @@ private[spark] object ConnectionManagerTest extends Logging{
None
})
val size = (args(3).toInt) * 1024 * 1024
val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
buffer.flip
......@@ -56,7 +64,7 @@ private[spark] object ConnectionManagerTest extends Logging{
logInfo("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]")
connManager.sendMessageReliably(slaveConnManagerId, bufferMessage)
})
val results = futures.map(f => Await.result(f, 999.second))
val results = futures.map(f => Await.result(f, awaitTime))
val finishTime = System.currentTimeMillis
Thread.sleep(5000)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment