Skip to content
Snippets Groups Projects
Commit 1ba3c173 authored by Jey Kottalam's avatar Jey Kottalam
Browse files

use parens when calling method with side-effects

parent 7c5ff733
No related branches found
No related tags found
No related merge requests found
...@@ -59,7 +59,7 @@ class SparkEnv ( ...@@ -59,7 +59,7 @@ class SparkEnv (
def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = { def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
synchronized { synchronized {
pythonWorkers.getOrElseUpdate((pythonExec, envVars), new PythonWorkerFactory(pythonExec, envVars)).create pythonWorkers.getOrElseUpdate((pythonExec, envVars), new PythonWorkerFactory(pythonExec, envVars)).create()
} }
} }
......
...@@ -116,12 +116,12 @@ private[spark] class PythonRDD[T: ClassManifest]( ...@@ -116,12 +116,12 @@ private[spark] class PythonRDD[T: ClassManifest](
// We've finished the data section of the output, but we can still // We've finished the data section of the output, but we can still
// read some accumulator updates; let's do that, breaking when we // read some accumulator updates; let's do that, breaking when we
// get a negative length record. // get a negative length record.
var len2 = stream.readInt var len2 = stream.readInt()
while (len2 >= 0) { while (len2 >= 0) {
val update = new Array[Byte](len2) val update = new Array[Byte](len2)
stream.readFully(update) stream.readFully(update)
accumulator += Collections.singletonList(update) accumulator += Collections.singletonList(update)
len2 = stream.readInt len2 = stream.readInt()
} }
new Array[Byte](0) new Array[Byte](0)
} }
......
...@@ -16,7 +16,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String ...@@ -16,7 +16,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
def create(): Socket = { def create(): Socket = {
synchronized { synchronized {
// Start the daemon if it hasn't been started // Start the daemon if it hasn't been started
startDaemon startDaemon()
// Attempt to connect, restart and retry once if it fails // Attempt to connect, restart and retry once if it fails
try { try {
...@@ -24,8 +24,8 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String ...@@ -24,8 +24,8 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
} catch { } catch {
case exc: SocketException => { case exc: SocketException => {
logWarning("Python daemon unexpectedly quit, attempting to restart") logWarning("Python daemon unexpectedly quit, attempting to restart")
stopDaemon stopDaemon()
startDaemon startDaemon()
new Socket(daemonHost, daemonPort) new Socket(daemonHost, daemonPort)
} }
case e => throw e case e => throw e
...@@ -34,7 +34,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String ...@@ -34,7 +34,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
} }
def stop() { def stop() {
stopDaemon stopDaemon()
} }
private def startDaemon() { private def startDaemon() {
...@@ -51,7 +51,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String ...@@ -51,7 +51,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
val workerEnv = pb.environment() val workerEnv = pb.environment()
workerEnv.putAll(envVars) workerEnv.putAll(envVars)
daemon = pb.start() daemon = pb.start()
daemonPort = new DataInputStream(daemon.getInputStream).readInt daemonPort = new DataInputStream(daemon.getInputStream).readInt()
// Redirect the stderr to ours // Redirect the stderr to ours
new Thread("stderr reader for " + pythonExec) { new Thread("stderr reader for " + pythonExec) {
...@@ -71,7 +71,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String ...@@ -71,7 +71,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
}.start() }.start()
} catch { } catch {
case e => { case e => {
stopDaemon stopDaemon()
throw e throw e
} }
} }
...@@ -85,7 +85,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String ...@@ -85,7 +85,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
synchronized { synchronized {
// Request shutdown of existing daemon by sending SIGTERM // Request shutdown of existing daemon by sending SIGTERM
if (daemon != null) { if (daemon != null) {
daemon.destroy daemon.destroy()
} }
daemon = null daemon = null
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment