Skip to content
Snippets Groups Projects
Commit 98195c30 authored by linweizhong's avatar linweizhong Committed by Shivaram Venkataraman
Browse files

[SPARK-7526] [SPARKR] Specify ip of RBackend, MonitorServer and RRDD Socket server

These R process only used to communicate with JVM process on local, so binding to localhost is more reasonable then wildcard ip.

Author: linweizhong <linweizhong@huawei.com>

Closes #6053 from Sephiroth-Lin/spark-7526 and squashes the following commits:

5303af7 [linweizhong] bind to localhost rather than wildcard ip
parent df9b94a5
No related branches found
No related tags found
No related merge requests found
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
package org.apache.spark.api.r package org.apache.spark.api.r
import java.io.{DataOutputStream, File, FileOutputStream, IOException} import java.io.{DataOutputStream, File, FileOutputStream, IOException}
import java.net.{InetSocketAddress, ServerSocket} import java.net.{InetAddress, InetSocketAddress, ServerSocket}
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import io.netty.bootstrap.ServerBootstrap import io.netty.bootstrap.ServerBootstrap
...@@ -65,7 +65,7 @@ private[spark] class RBackend { ...@@ -65,7 +65,7 @@ private[spark] class RBackend {
} }
}) })
channelFuture = bootstrap.bind(new InetSocketAddress(0)) channelFuture = bootstrap.bind(new InetSocketAddress("localhost", 0))
channelFuture.syncUninterruptibly() channelFuture.syncUninterruptibly()
channelFuture.channel().localAddress().asInstanceOf[InetSocketAddress].getPort() channelFuture.channel().localAddress().asInstanceOf[InetSocketAddress].getPort()
} }
...@@ -101,7 +101,7 @@ private[spark] object RBackend extends Logging { ...@@ -101,7 +101,7 @@ private[spark] object RBackend extends Logging {
try { try {
// bind to random port // bind to random port
val boundPort = sparkRBackend.init() val boundPort = sparkRBackend.init()
val serverSocket = new ServerSocket(0, 1) val serverSocket = new ServerSocket(0, 1, InetAddress.getByName("localhost"))
val listenPort = serverSocket.getLocalPort() val listenPort = serverSocket.getLocalPort()
// tell the R process via temporary file // tell the R process via temporary file
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
package org.apache.spark.api.r package org.apache.spark.api.r
import java.io._ import java.io._
import java.net.ServerSocket import java.net.{InetAddress, ServerSocket}
import java.util.{Map => JMap} import java.util.{Map => JMap}
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
...@@ -55,7 +55,7 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag]( ...@@ -55,7 +55,7 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag](
val parentIterator = firstParent[T].iterator(partition, context) val parentIterator = firstParent[T].iterator(partition, context)
// we expect two connections // we expect two connections
val serverSocket = new ServerSocket(0, 2) val serverSocket = new ServerSocket(0, 2, InetAddress.getByName("localhost"))
val listenPort = serverSocket.getLocalPort() val listenPort = serverSocket.getLocalPort()
// The stdout/stderr is shared by multiple tasks, because we use one daemon // The stdout/stderr is shared by multiple tasks, because we use one daemon
...@@ -414,7 +414,7 @@ private[r] object RRDD { ...@@ -414,7 +414,7 @@ private[r] object RRDD {
synchronized { synchronized {
if (daemonChannel == null) { if (daemonChannel == null) {
// we expect one connections // we expect one connections
val serverSocket = new ServerSocket(0, 1) val serverSocket = new ServerSocket(0, 1, InetAddress.getByName("localhost"))
val daemonPort = serverSocket.getLocalPort val daemonPort = serverSocket.getLocalPort
errThread = createRProcess(rLibDir, daemonPort, "daemon.R") errThread = createRProcess(rLibDir, daemonPort, "daemon.R")
// the socket used to send out the input of task // the socket used to send out the input of task
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment