Skip to content
Snippets Groups Projects
Commit e46d547c authored by Mridul Muralidharan's avatar Mridul Muralidharan
Browse files

Fix issues reported by Reynold

parent 430c5314
No related branches found
No related tags found
No related merge requests found
...@@ -188,6 +188,38 @@ private[spark] class ConnectionManager(port: Int) extends Logging { ...@@ -188,6 +188,38 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
} ) } )
} }
// MUST be called within selector loop - else deadlock.
private def triggerForceCloseByException(key: SelectionKey, e: Exception) {
try {
key.interestOps(0)
} catch {
// ignore exceptions
case e: Exception => logDebug("Ignoring exception", e)
}
val conn = connectionsByKey.getOrElse(key, null)
if (conn == null) return
// Pushing to connect threadpool
handleConnectExecutor.execute(new Runnable {
override def run() {
try {
conn.callOnExceptionCallback(e)
} catch {
// ignore exceptions
case e: Exception => logDebug("Ignoring exception", e)
}
try {
conn.close()
} catch {
// ignore exceptions
case e: Exception => logDebug("Ignoring exception", e)
}
}
})
}
def run() { def run() {
try { try {
while(!selectorThread.isInterrupted) { while(!selectorThread.isInterrupted) {
...@@ -235,18 +267,26 @@ private[spark] class ConnectionManager(port: Int) extends Logging { ...@@ -235,18 +267,26 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
while (selectedKeys.hasNext()) { while (selectedKeys.hasNext()) {
val key = selectedKeys.next val key = selectedKeys.next
selectedKeys.remove() selectedKeys.remove()
if (key.isValid) { try {
if (key.isAcceptable) { if (key.isValid) {
acceptConnection(key) if (key.isAcceptable) {
} else acceptConnection(key)
if (key.isConnectable) { } else
triggerConnect(key) if (key.isConnectable) {
} else triggerConnect(key)
if (key.isReadable) { } else
triggerRead(key) if (key.isReadable) {
} else triggerRead(key)
if (key.isWritable) { } else
triggerWrite(key) if (key.isWritable) {
triggerWrite(key)
}
}
} catch {
// weird, but we saw this happening - even though key.isValid was true, key.isAcceptable would throw CancelledKeyException.
case e: CancelledKeyException => {
logInfo("key already cancelled ? " + key, e)
triggerForceCloseByException(key, e)
} }
} }
} }
......
...@@ -95,6 +95,7 @@ export JAVA_OPTS ...@@ -95,6 +95,7 @@ export JAVA_OPTS
CORE_DIR="$FWDIR/core" CORE_DIR="$FWDIR/core"
REPL_DIR="$FWDIR/repl" REPL_DIR="$FWDIR/repl"
REPL_BIN_DIR="$FWDIR/repl-bin"
EXAMPLES_DIR="$FWDIR/examples" EXAMPLES_DIR="$FWDIR/examples"
BAGEL_DIR="$FWDIR/bagel" BAGEL_DIR="$FWDIR/bagel"
STREAMING_DIR="$FWDIR/streaming" STREAMING_DIR="$FWDIR/streaming"
...@@ -125,8 +126,8 @@ if [ -e "$FWDIR/lib_managed" ]; then ...@@ -125,8 +126,8 @@ if [ -e "$FWDIR/lib_managed" ]; then
CLASSPATH+=":$FWDIR/lib_managed/bundles/*" CLASSPATH+=":$FWDIR/lib_managed/bundles/*"
fi fi
CLASSPATH+=":$REPL_DIR/lib/*" CLASSPATH+=":$REPL_DIR/lib/*"
if [ -e repl-bin/target ]; then if [ -e $REPL_BIN_DIR/target ]; then
for jar in `find "repl-bin/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do
CLASSPATH+=":$jar" CLASSPATH+=":$jar"
done done
fi fi
...@@ -134,7 +135,6 @@ CLASSPATH+=":$BAGEL_DIR/target/scala-$SCALA_VERSION/classes" ...@@ -134,7 +135,6 @@ CLASSPATH+=":$BAGEL_DIR/target/scala-$SCALA_VERSION/classes"
for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do
CLASSPATH+=":$jar" CLASSPATH+=":$jar"
done done
export CLASSPATH # Needed for spark-shell
# Figure out the JAR file that our examples were packaged into. This includes a bit of a hack # Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
# to avoid the -sources and -doc packages that are built by publish-local. # to avoid the -sources and -doc packages that are built by publish-local.
...@@ -163,4 +163,5 @@ else ...@@ -163,4 +163,5 @@ else
EXTRA_ARGS="$JAVA_OPTS" EXTRA_ARGS="$JAVA_OPTS"
fi fi
export CLASSPATH # Needed for spark-shell
exec "$RUNNER" -cp "$CLASSPATH" $EXTRA_ARGS "$@" exec "$RUNNER" -cp "$CLASSPATH" $EXTRA_ARGS "$@"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment