Skip to content
Snippets Groups Projects
Commit 9e64396c authored by Reynold Xin's avatar Reynold Xin
Browse files

Cleaned up the Java files from Shane's PR.

parent 0e5cc308
No related branches found
No related tags found
No related merge requests found
package spark.network.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.oio.OioSocketChannel;
import java.util.Arrays;
public class FileClient {
class FileClient {
private FileClientHandler handler = null;
private Channel channel = null;
private Bootstrap bootstrap = null;
public FileClient(FileClientHandler handler){
public FileClient(FileClientHandler handler) {
this.handler = handler;
}
public void init(){
bootstrap = new Bootstrap();
bootstrap.group(new OioEventLoopGroup())
public void init() {
bootstrap = new Bootstrap();
bootstrap.group(new OioEventLoopGroup())
.channel(OioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new FileClientChannelInitializer(handler));
}
}
public static final class ChannelCloseListener implements ChannelFutureListener {
private FileClient fc = null;
public ChannelCloseListener(FileClient fc){
this.fc = fc;
}
@Override
public void operationComplete(ChannelFuture future) {
if (fc.bootstrap!=null){
......@@ -46,44 +44,39 @@ public class FileClient {
}
}
public void connect(String host, int port){
public void connect(String host, int port) {
try {
// Start the connection attempt.
channel = bootstrap.connect(host, port).sync().channel();
// ChannelFuture cf = channel.closeFuture();
//cf.addListener(new ChannelCloseListener(this));
} catch (InterruptedException e) {
close();
}
}
}
public void waitForClose(){
public void waitForClose() {
try {
channel.closeFuture().sync();
} catch (InterruptedException e){
e.printStackTrace();
}
}
}
public void sendRequest(String file){
public void sendRequest(String file) {
//assert(file == null);
//assert(channel == null);
channel.write(file+"\r\n");
channel.write(file + "\r\n");
}
public void close(){
public void close() {
if(channel != null) {
channel.close();
channel = null;
channel.close();
channel = null;
}
if ( bootstrap!=null) {
bootstrap.shutdown();
bootstrap = null;
}
}
}
......@@ -3,15 +3,10 @@ package spark.network.netty;
import io.netty.buffer.BufType;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.logging.LogLevel;
public class FileClientChannelInitializer extends
ChannelInitializer<SocketChannel> {
class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> {
private FileClientHandler fhandler;
......@@ -23,7 +18,7 @@ public class FileClientChannelInitializer extends
public void initChannel(SocketChannel channel) {
// file no more than 2G
channel.pipeline()
.addLast("encoder", new StringEncoder(BufType.BYTE))
.addLast("handler", fhandler);
.addLast("encoder", new StringEncoder(BufType.BYTE))
.addLast("handler", fhandler);
}
}
......@@ -3,12 +3,9 @@ package spark.network.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
public abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {
abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {
private FileHeader currentHeader = null;
......@@ -19,7 +16,7 @@ public abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter
// Use direct buffer if possible.
return ctx.alloc().ioBuffer();
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
// get header
......@@ -27,8 +24,8 @@ public abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter
currentHeader = FileHeader.create(in.readBytes(FileHeader.HEADER_SIZE()));
}
// get file
if(in.readableBytes() >= currentHeader.fileLen()){
handle(ctx,in,currentHeader);
if(in.readableBytes() >= currentHeader.fileLen()) {
handle(ctx, in, currentHeader);
currentHeader = null;
ctx.close();
}
......
package spark.network.netty;
import java.io.File;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.Channel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.oio.OioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* Server that accept the path of a file an echo back its content.
*/
public class FileServer {
class FileServer {
private ServerBootstrap bootstrap = null;
private Channel channel = null;
private PathResolver pResolver;
private ServerBootstrap bootstrap = null;
private Channel channel = null;
private PathResolver pResolver;
public FileServer(PathResolver pResolver) {
this.pResolver = pResolver;
}
public FileServer(PathResolver pResolver){
this.pResolver = pResolver;
public void run(int port) {
// Configure the server.
bootstrap = new ServerBootstrap();
try {
bootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup())
.channel(OioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.option(ChannelOption.SO_RCVBUF, 1500)
.childHandler(new FileServerChannelInitializer(pResolver));
// Start the server.
channel = bootstrap.bind(port).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally{
bootstrap.shutdown();
}
}
public void run(int port) {
// Configure the server.
bootstrap = new ServerBootstrap();
try {
bootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup())
.channel(OioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.option(ChannelOption.SO_RCVBUF, 1500)
.childHandler(new FileServerChannelInitializer(pResolver));
// Start the server.
channel = bootstrap.bind(port).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally{
bootstrap.shutdown();
}
public void stop() {
if (channel!=null) {
channel.close();
}
public void stop(){
if (channel!=null){
channel.close();
}
if (bootstrap != null){
bootstrap.shutdown();
}
if (bootstrap != null) {
bootstrap.shutdown();
}
}
}
package spark.network.netty;
import java.io.File;
import io.netty.buffer.BufType;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.util.CharsetUtil;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.codec.string.StringDecoder;
public class FileServerChannelInitializer extends
ChannelInitializer<SocketChannel> {
class FileServerChannelInitializer extends ChannelInitializer<SocketChannel> {
PathResolver pResolver;
PathResolver pResolver;
public FileServerChannelInitializer(PathResolver pResolver) {
this.pResolver = pResolver;
......@@ -24,10 +18,8 @@ public class FileServerChannelInitializer extends
@Override
public void initChannel(SocketChannel channel) {
channel.pipeline()
.addLast("framer", new DelimiterBasedFrameDecoder(
8192, Delimiters.lineDelimiter()))
.addLast("strDecoder", new StringDecoder())
.addLast("handler", new FileServerHandler(pResolver));
.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()))
.addLast("strDecoder", new StringDecoder())
.addLast("handler", new FileServerHandler(pResolver));
}
}
package spark.network.netty;
import java.io.File;
import java.io.FileInputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.DefaultFileRegion;
import io.netty.handler.stream.ChunkedFile;
import java.io.File;
import java.io.FileInputStream;
public class FileServerHandler extends
ChannelInboundMessageHandlerAdapter<String> {
PathResolver pResolver;
class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
PathResolver pResolver;
public FileServerHandler(PathResolver pResolver){
this.pResolver = pResolver;
}
......@@ -21,8 +21,8 @@ public class FileServerHandler extends
String path = pResolver.getAbsolutePath(blockId);
// if getFilePath returns null, close the channel
if (path == null) {
//ctx.close();
return;
//ctx.close();
return;
}
File file = new File(path);
if (file.exists()) {
......@@ -33,23 +33,21 @@ public class FileServerHandler extends
return;
}
long length = file.length();
if (length > Integer.MAX_VALUE || length <= 0 ) {
if (length > Integer.MAX_VALUE || length <= 0) {
//logger.info("too large file : " + file.getAbsolutePath() + " of size "+ length);
ctx.write(new FileHeader(0, blockId).buffer());
ctx.flush();
return;
return;
}
int len = new Long(length).intValue();
//logger.info("Sending block "+blockId+" filelen = "+len);
//logger.info("header = "+ (new FileHeader(len, blockId)).buffer());
ctx.write((new FileHeader(len, blockId)).buffer());
try {
ctx.sendFile(new DefaultFileRegion(new FileInputStream(file)
.getChannel(), 0, file.length()));
ctx.sendFile(new DefaultFileRegion(new FileInputStream(file)
.getChannel(), 0, file.length()));
} catch (Exception e) {
// TODO Auto-generated catch block
//logger.warning("Exception when sending file : "
//+ file.getAbsolutePath());
//logger.warning("Exception when sending file : " + file.getAbsolutePath());
e.printStackTrace();
}
} else {
......@@ -58,8 +56,7 @@ public class FileServerHandler extends
}
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
......
package spark.network.netty;
public interface PathResolver {
/**
* Get the absolute path of the file
*
*
* @param fileId
* @return the absolute path of file
*/
public String getAbsolutePath(String fileId);
}
......@@ -288,7 +288,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
val port = System.getProperty("spark.shuffle.sender.port", "6653").toInt
val pResolver = new PathResolver {
def getAbsolutePath(blockId:String):String = {
override def getAbsolutePath(blockId: String): String = {
if (!blockId.startsWith("shuffle_")) {
return null
}
......@@ -298,7 +298,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
shuffleSender = new Thread {
override def run() = {
val sender = new ShuffleSender(port,pResolver)
logInfo("created ShuffleSender binding to port : "+ port)
logInfo("Created ShuffleSender binding to port : "+ port)
sender.start
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment