Skip to content
Snippets Groups Projects
Commit 962aac4d authored by guoxu1231's avatar guoxu1231 Committed by Sean Owen
Browse files

[SPARK-12513][STREAMING] SocketReceiver hang in Netcat example

Explicitly close client side socket connection before restart socket receiver.

Author: guoxu1231 <guoxu1231@gmail.com>
Author: Shawn Guo <guoxu1231@gmail.com>

Closes #10464 from guoxu1231/SPARK-12513.
parent 9fd7a2f0
No related branches found
No related tags found
No related merge requests found
......@@ -18,7 +18,7 @@
package org.apache.spark.streaming.dstream
import java.io._
import java.net.{Socket, UnknownHostException}
import java.net.{ConnectException, Socket}
import scala.reflect.ClassTag
import scala.util.control.NonFatal
......@@ -51,7 +51,20 @@ class SocketReceiver[T: ClassTag](
storageLevel: StorageLevel
) extends Receiver[T](storageLevel) with Logging {
private var socket: Socket = _
def onStart() {
logInfo(s"Connecting to $host:$port")
try {
socket = new Socket(host, port)
} catch {
case e: ConnectException =>
restart(s"Error connecting to $host:$port", e)
return
}
logInfo(s"Connected to $host:$port")
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
setDaemon(true)
......@@ -60,20 +73,22 @@ class SocketReceiver[T: ClassTag](
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
// in case restart thread close it twice
synchronized {
if (socket != null) {
socket.close()
socket = null
logInfo(s"Closed socket to $host:$port")
}
}
}
/** Create a socket connection and receive data until receiver is stopped */
def receive() {
var socket: Socket = null
try {
logInfo("Connecting to " + host + ":" + port)
socket = new Socket(host, port)
logInfo("Connected to " + host + ":" + port)
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next)
store(iterator.next())
}
if (!isStopped()) {
restart("Socket data stream had no more data")
......@@ -81,16 +96,11 @@ class SocketReceiver[T: ClassTag](
logInfo("Stopped receiving")
}
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + host + ":" + port, e)
case NonFatal(e) =>
logWarning("Error receiving data", e)
restart("Error receiving data", e)
} finally {
if (socket != null) {
socket.close()
logInfo("Closed socket to " + host + ":" + port)
}
onStop()
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment