Skip to content
Snippets Groups Projects
Commit 5c746eed authored by Davies Liu's avatar Davies Liu Committed by Josh Rosen
Browse files

[SPARK-5395] [PySpark] fix python process leak while coalesce()

Currently, the Python process is released into pool only after the task had finished, it cause many process forked if coalesce() is called.

This PR will change it to release the process as soon as read all the data from it (finish the partition), then a process could be reused to process multiple partitions in a single task.

Author: Davies Liu <davies@databricks.com>

Closes #4238 from davies/py_leak and squashes the following commits:

ec80a43 [Davies Liu] add @volatile
6da437a [Davies Liu] address comments
24ed322 [Davies Liu] fix python process leak while coalesce()
parent ce9c43ba
No related branches found
No related tags found
No related merge requests found
......@@ -67,17 +67,16 @@ private[spark] class PythonRDD(
envVars += ("SPARK_REUSE_WORKER" -> "1")
}
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
// Whether is the worker released into idle pool
@volatile var released = false
// Start a thread to feed the process input from our parent's iterator
val writerThread = new WriterThread(env, worker, split, context)
var complete_cleanly = false
context.addTaskCompletionListener { context =>
writerThread.shutdownOnTaskCompletion()
writerThread.join()
if (reuse_worker && complete_cleanly) {
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
} else {
if (!reuse_worker || !released) {
try {
worker.close()
} catch {
......@@ -145,8 +144,12 @@ private[spark] class PythonRDD(
stream.readFully(update)
accumulator += Collections.singletonList(update)
}
// Check whether the worker is ready to be re-used.
if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
complete_cleanly = true
if (reuse_worker) {
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
released = true
}
}
null
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment