Skip to content
Snippets Groups Projects
Commit 67df7f2f authored by Stephen Haberman's avatar Stephen Haberman
Browse files

Add private, minor formatting.

parent 9cfa0683
No related branches found
No related tags found
No related merge requests found
...@@ -66,31 +66,28 @@ private[spark] class ConnectionManager(port: Int) extends Logging { ...@@ -66,31 +66,28 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort) val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)
logInfo("Bound socket to port " + serverChannel.socket.getLocalPort() + " with id = " + id) logInfo("Bound socket to port " + serverChannel.socket.getLocalPort() + " with id = " + id)
val thisInstance = this
val selectorThread = new Thread("connection-manager-thread") { val selectorThread = new Thread("connection-manager-thread") {
override def run() { override def run() = ConnectionManager.this.run()
thisInstance.run()
}
} }
selectorThread.setDaemon(true) selectorThread.setDaemon(true)
selectorThread.start() selectorThread.start()
def run() { private def run() {
try { try {
while(!selectorThread.isInterrupted) { while(!selectorThread.isInterrupted) {
for( (connectionManagerId, sendingConnection) <- connectionRequests) { for ((connectionManagerId, sendingConnection) <- connectionRequests) {
sendingConnection.connect() sendingConnection.connect()
addConnection(sendingConnection) addConnection(sendingConnection)
connectionRequests -= connectionManagerId connectionRequests -= connectionManagerId
} }
sendMessageRequests.synchronized { sendMessageRequests.synchronized {
while(!sendMessageRequests.isEmpty) { while (!sendMessageRequests.isEmpty) {
val (message, connection) = sendMessageRequests.dequeue val (message, connection) = sendMessageRequests.dequeue
connection.send(message) connection.send(message)
} }
} }
while(!keyInterestChangeRequests.isEmpty) { while (!keyInterestChangeRequests.isEmpty) {
val (key, ops) = keyInterestChangeRequests.dequeue val (key, ops) = keyInterestChangeRequests.dequeue
val connection = connectionsByKey(key) val connection = connectionsByKey(key)
val lastOps = key.interestOps() val lastOps = key.interestOps()
...@@ -126,14 +123,11 @@ private[spark] class ConnectionManager(port: Int) extends Logging { ...@@ -126,14 +123,11 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
if (key.isValid) { if (key.isValid) {
if (key.isAcceptable) { if (key.isAcceptable) {
acceptConnection(key) acceptConnection(key)
} else } else if (key.isConnectable) {
if (key.isConnectable) {
connectionsByKey(key).asInstanceOf[SendingConnection].finishConnect() connectionsByKey(key).asInstanceOf[SendingConnection].finishConnect()
} else } else if (key.isReadable) {
if (key.isReadable) {
connectionsByKey(key).read() connectionsByKey(key).read()
} else } else if (key.isWritable) {
if (key.isWritable) {
connectionsByKey(key).write() connectionsByKey(key).write()
} }
} }
...@@ -144,7 +138,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging { ...@@ -144,7 +138,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
} }
} }
def acceptConnection(key: SelectionKey) { private def acceptConnection(key: SelectionKey) {
val serverChannel = key.channel.asInstanceOf[ServerSocketChannel] val serverChannel = key.channel.asInstanceOf[ServerSocketChannel]
val newChannel = serverChannel.accept() val newChannel = serverChannel.accept()
val newConnection = new ReceivingConnection(newChannel, selector) val newConnection = new ReceivingConnection(newChannel, selector)
...@@ -154,7 +148,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging { ...@@ -154,7 +148,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
logInfo("Accepted connection from [" + newConnection.remoteAddress.getAddress + "]") logInfo("Accepted connection from [" + newConnection.remoteAddress.getAddress + "]")
} }
def addConnection(connection: Connection) { private def addConnection(connection: Connection) {
connectionsByKey += ((connection.key, connection)) connectionsByKey += ((connection.key, connection))
if (connection.isInstanceOf[SendingConnection]) { if (connection.isInstanceOf[SendingConnection]) {
val sendingConnection = connection.asInstanceOf[SendingConnection] val sendingConnection = connection.asInstanceOf[SendingConnection]
...@@ -165,7 +159,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging { ...@@ -165,7 +159,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
connection.onClose(removeConnection) connection.onClose(removeConnection)
} }
def removeConnection(connection: Connection) { private def removeConnection(connection: Connection) {
connectionsByKey -= connection.key connectionsByKey -= connection.key
if (connection.isInstanceOf[SendingConnection]) { if (connection.isInstanceOf[SendingConnection]) {
val sendingConnection = connection.asInstanceOf[SendingConnection] val sendingConnection = connection.asInstanceOf[SendingConnection]
...@@ -222,16 +216,16 @@ private[spark] class ConnectionManager(port: Int) extends Logging { ...@@ -222,16 +216,16 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
} }
} }
def handleConnectionError(connection: Connection, e: Exception) { private def handleConnectionError(connection: Connection, e: Exception) {
logInfo("Handling connection error on connection to " + connection.remoteConnectionManagerId) logInfo("Handling connection error on connection to " + connection.remoteConnectionManagerId)
removeConnection(connection) removeConnection(connection)
} }
def changeConnectionKeyInterest(connection: Connection, ops: Int) { private def changeConnectionKeyInterest(connection: Connection, ops: Int) {
keyInterestChangeRequests += ((connection.key, ops)) keyInterestChangeRequests += ((connection.key, ops))
} }
def receiveMessage(connection: Connection, message: Message) { private def receiveMessage(connection: Connection, message: Message) {
val connectionManagerId = ConnectionManagerId.fromSocketAddress(message.senderAddress) val connectionManagerId = ConnectionManagerId.fromSocketAddress(message.senderAddress)
logDebug("Received [" + message + "] from [" + connectionManagerId + "]") logDebug("Received [" + message + "] from [" + connectionManagerId + "]")
val runnable = new Runnable() { val runnable = new Runnable() {
...@@ -351,7 +345,6 @@ private[spark] class ConnectionManager(port: Int) extends Logging { ...@@ -351,7 +345,6 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
private[spark] object ConnectionManager { private[spark] object ConnectionManager {
def main(args: Array[String]) { def main(args: Array[String]) {
val manager = new ConnectionManager(9999) val manager = new ConnectionManager(9999)
manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
println("Received [" + msg + "] from [" + id + "]") println("Received [" + msg + "] from [" + id + "]")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment