Skip to content
Snippets Groups Projects
Commit 97a0bfe1 authored by Aaron Davidson's avatar Aaron Davidson Committed by Patrick Wendell
Browse files

SPARK-2282: Reuse PySpark Accumulator sockets to avoid crashing Spark

JIRA: https://issues.apache.org/jira/browse/SPARK-2282

This issue is caused by a buildup of sockets in the TIME_WAIT stage of TCP, which is a stage that lasts for some period of time after the communication closes.

This solution simply allows us to reuse sockets that are in TIME_WAIT, to avoid issues with the buildup of the rapid creation of these sockets.

Author: Aaron Davidson <aaron@databricks.com>

Closes #1220 from aarondav/SPARK-2282 and squashes the following commits:

2e5cab3 [Aaron Davidson] SPARK-2282: Reuse PySpark Accumulator sockets to avoid crashing Spark
parent 3894a49b
No related branches found
No related tags found
No related merge requests found
...@@ -599,6 +599,8 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort: ...@@ -599,6 +599,8 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort:
} else { } else {
// This happens on the master, where we pass the updates to Python through a socket // This happens on the master, where we pass the updates to Python through a socket
val socket = new Socket(serverHost, serverPort) val socket = new Socket(serverHost, serverPort)
// SPARK-2282: Immediately reuse closed sockets because we create one per task.
socket.setReuseAddress(true)
val in = socket.getInputStream val in = socket.getInputStream
val out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream, bufferSize)) val out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream, bufferSize))
out.writeInt(val2.size) out.writeInt(val2.size)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment