Skip to content
Snippets Groups Projects
Commit fbc1ab34 authored by Shivaram Venkataraman's avatar Shivaram Venkataraman
Browse files

Couple of Netty fixes

a. Fix the port number by reading it from the bound channel
b. Fix the shutdown sequence to make sure we actually block on the channel
c. Fix the unit test to use two JVMs.
parent 3db1e17b
No related branches found
No related tags found
No related merge requests found
......@@ -37,29 +37,33 @@ class FileServer {
.childHandler(new FileServerChannelInitializer(pResolver));
// Start the server.
channelFuture = bootstrap.bind(addr);
this.port = addr.getPort();
try {
// Get the address we bound to.
InetSocketAddress boundAddress =
((InetSocketAddress) channelFuture.sync().channel().localAddress());
this.port = boundAddress.getPort();
} catch (InterruptedException ie) {
this.port = 0;
}
}
/**
* Start the file server asynchronously in a new thread.
*/
public void start() {
try {
blockingThread = new Thread() {
public void run() {
try {
Channel channel = channelFuture.sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
LOG.error("File server start got interrupted", e);
}
blockingThread = new Thread() {
public void run() {
try {
channelFuture.channel().closeFuture().sync();
LOG.info("FileServer exiting");
} catch (InterruptedException e) {
LOG.error("File server start got interrupted", e);
}
};
blockingThread.setDaemon(true);
blockingThread.start();
} finally {
bootstrap.shutdown();
}
// NOTE: bootstrap is shutdown in stop()
}
};
blockingThread.setDaemon(true);
blockingThread.start();
}
public int getPort() {
......@@ -67,17 +71,16 @@ class FileServer {
}
public void stop() {
if (blockingThread != null) {
blockingThread.stop();
blockingThread = null;
}
// Close the bound channel.
if (channelFuture != null) {
channelFuture.channel().closeFuture();
channelFuture.channel().close();
channelFuture = null;
}
// Shutdown bootstrap.
if (bootstrap != null) {
bootstrap.shutdown();
bootstrap = null;
}
// TODO: Shutdown all accepted channels as well ?
}
}
......@@ -305,9 +305,20 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
assert(c.partitioner.get === p)
}
test("shuffle local cluster") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,512]", "test")
val a = sc.parallelize(1 to 10, 2)
val b = a.map {
x => (x, x * 2)
}
val c = new ShuffledRDD(b, new HashPartitioner(3))
assert(c.count === 10)
}
test("shuffle serializer") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[1,2,512]", "test")
sc = new SparkContext("local-cluster[2,1,512]", "test")
val a = sc.parallelize(1 to 10, 2)
val b = a.map { x =>
(x, new ShuffleSuite.NonJavaSerializableClass(x * 2))
......@@ -317,6 +328,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
val c = new ShuffledRDD(b, new HashPartitioner(3), classOf[spark.KryoSerializer].getName)
assert(c.count === 10)
}
}
object ShuffleSuite {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment