Skip to content
Snippets Groups Projects
Commit 2fb84950 authored by Joshua Hartman's avatar Joshua Hartman
Browse files

Replacing the native lzf compression code with the ning open-source...

Replacing the native lzf compression code with the ning open-source compress-lzf library. (Apache 2.0 liscense)
parent f8ea98d9
No related branches found
No related tags found
No related merge requests found
......@@ -15,12 +15,13 @@ JARS += third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar
JARS += third_party/apache-log4j-1.2.16/log4j-1.2.16.jar
JARS += third_party/slf4j-1.6.1/slf4j-api-1.6.1.jar
JARS += third_party/slf4j-1.6.1/slf4j-log4j12-1.6.1.jar
JARS += third_party/compress-lzf-0.6.0/compress-lzf-0.6.0.jar
CLASSPATH = $(subst $(SPACE),:,$(JARS))
SCALA_SOURCES = src/examples/*.scala src/scala/spark/*.scala src/scala/spark/repl/*.scala
SCALA_SOURCES += src/test/spark/*.scala src/test/spark/repl/*.scala
JAVA_SOURCES = $(wildcard src/java/spark/compress/lzf/*.java)
ifeq ($(USE_FSC),1)
COMPILER_NAME = fsc
......@@ -36,25 +37,19 @@ endif
CONF_FILES = conf/spark-env.sh conf/log4j.properties conf/java-opts
all: scala java conf-files
all: scala conf-files
build/classes:
mkdir -p build/classes
scala: build/classes java
scala: build/classes
$(COMPILER) -d build/classes -classpath build/classes:$(CLASSPATH) $(SCALA_SOURCES)
java: $(JAVA_SOURCES) build/classes
javac -d build/classes $(JAVA_SOURCES)
native: java
$(MAKE) -C src/native
jar: build/spark.jar build/spark-dep.jar
dep-jar: build/spark-dep.jar
build/spark.jar: scala java
build/spark.jar: scala
jar cf build/spark.jar -C build/classes spark
build/spark-dep.jar:
......@@ -73,7 +68,6 @@ test: all
default: all
clean:
$(MAKE) -C src/native clean
rm -rf build
.phony: default all clean scala java native jar dep-jar conf-files
.phony: default all clean scala jar dep-jar conf-files
......@@ -48,6 +48,7 @@ CLASSPATH+=:$FWDIR/third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar
CLASSPATH+=:$FWDIR/third_party/apache-log4j-1.2.16/log4j-1.2.16.jar
CLASSPATH+=:$FWDIR/third_party/slf4j-1.6.1/slf4j-api-1.6.1.jar
CLASSPATH+=:$FWDIR/third_party/slf4j-1.6.1/slf4j-log4j12-1.6.1.jar
CLASSPATH+=:$FWDIR/third_party/compress-lzf-0.6.0/compress-lzf-0.6.0.jar
for jar in $FWDIR/third_party/hadoop-0.20.0/lib/*.jar; do
CLASSPATH+=:$jar
done
......
package spark.compress.lzf;
public class LZF {
private static boolean loaded;
static {
try {
System.loadLibrary("spark_native");
loaded = true;
} catch(Throwable t) {
System.out.println("Failed to load native LZF library: " + t.toString());
loaded = false;
}
}
public static boolean isLoaded() {
return loaded;
}
public static native int compress(
byte[] in, int inOff, int inLen,
byte[] out, int outOff, int outLen);
public static native int decompress(
byte[] in, int inOff, int inLen,
byte[] out, int outOff, int outLen);
}
package spark.compress.lzf;
import java.io.EOFException;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
public class LZFInputStream extends FilterInputStream {
private static final int MAX_BLOCKSIZE = 1024 * 64 - 1;
private static final int MAX_HDR_SIZE = 7;
private byte[] inBuf; // Holds data to decompress (including header)
private byte[] outBuf; // Holds decompressed data to output
private int outPos; // Current position in outBuf
private int outSize; // Total amount of data in outBuf
private boolean closed;
private boolean reachedEof;
private byte[] singleByte = new byte[1];
public LZFInputStream(InputStream in) {
super(in);
if (in == null)
throw new NullPointerException();
inBuf = new byte[MAX_BLOCKSIZE + MAX_HDR_SIZE];
outBuf = new byte[MAX_BLOCKSIZE + MAX_HDR_SIZE];
outPos = 0;
outSize = 0;
}
private void ensureOpen() throws IOException {
if (closed) throw new IOException("Stream closed");
}
@Override
public int read() throws IOException {
ensureOpen();
int count = read(singleByte, 0, 1);
return (count == -1 ? -1 : singleByte[0] & 0xFF);
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
ensureOpen();
if ((off | len | (off + len) | (b.length - (off + len))) < 0)
throw new IndexOutOfBoundsException();
int totalRead = 0;
// Start with the current block in outBuf, and read and decompress any
// further blocks necessary. Instead of trying to decompress directly to b
// when b is large, we always use outBuf as an intermediate holding space
// in case GetPrimitiveArrayCritical decides to copy arrays instead of
// pinning them, which would cause b to be copied repeatedly into C-land.
while (len > 0) {
if (outPos == outSize) {
readNextBlock();
if (reachedEof)
return totalRead == 0 ? -1 : totalRead;
}
int amtToCopy = Math.min(outSize - outPos, len);
System.arraycopy(outBuf, outPos, b, off, amtToCopy);
off += amtToCopy;
len -= amtToCopy;
outPos += amtToCopy;
totalRead += amtToCopy;
}
return totalRead;
}
// Read len bytes from this.in to a buffer, stopping only if EOF is reached
private int readFully(byte[] b, int off, int len) throws IOException {
int totalRead = 0;
while (len > 0) {
int amt = in.read(b, off, len);
if (amt == -1)
break;
off += amt;
len -= amt;
totalRead += amt;
}
return totalRead;
}
// Read the next block from the underlying InputStream into outBuf,
// setting outPos and outSize, or set reachedEof if the stream ends.
private void readNextBlock() throws IOException {
// Read first 5 bytes of header
int count = readFully(inBuf, 0, 5);
if (count == 0) {
reachedEof = true;
return;
} else if (count < 5) {
throw new EOFException("Truncated LZF block header");
}
// Check magic bytes
if (inBuf[0] != 'Z' || inBuf[1] != 'V')
throw new IOException("Wrong magic bytes in LZF block header");
// Read the block
if (inBuf[2] == 0) {
// Uncompressed block - read directly to outBuf
int size = ((inBuf[3] & 0xFF) << 8) | (inBuf[4] & 0xFF);
if (readFully(outBuf, 0, size) != size)
throw new EOFException("EOF inside LZF block");
outPos = 0;
outSize = size;
} else if (inBuf[2] == 1) {
// Compressed block - read to inBuf and decompress
if (readFully(inBuf, 5, 2) != 2)
throw new EOFException("Truncated LZF block header");
int csize = ((inBuf[3] & 0xFF) << 8) | (inBuf[4] & 0xFF);
int usize = ((inBuf[5] & 0xFF) << 8) | (inBuf[6] & 0xFF);
if (readFully(inBuf, 7, csize) != csize)
throw new EOFException("Truncated LZF block");
if (LZF.decompress(inBuf, 7, csize, outBuf, 0, usize) != usize)
throw new IOException("Corrupt LZF data stream");
outPos = 0;
outSize = usize;
} else {
throw new IOException("Unknown block type in LZF block header");
}
}
/**
* Returns 0 after EOF has been reached, otherwise always return 1.
*
* Programs should not count on this method to return the actual number
* of bytes that could be read without blocking.
*/
@Override
public int available() throws IOException {
ensureOpen();
return reachedEof ? 0 : 1;
}
// TODO: Skip complete chunks without decompressing them?
@Override
public long skip(long n) throws IOException {
ensureOpen();
if (n < 0)
throw new IllegalArgumentException("negative skip length");
byte[] buf = new byte[512];
long skipped = 0;
while (skipped < n) {
int len = (int) Math.min(n - skipped, buf.length);
len = read(buf, 0, len);
if (len == -1) {
reachedEof = true;
break;
}
skipped += len;
}
return skipped;
}
@Override
public void close() throws IOException {
if (!closed) {
in.close();
closed = true;
}
}
@Override
public boolean markSupported() {
return false;
}
@Override
public void mark(int readLimit) {}
@Override
public void reset() throws IOException {
throw new IOException("mark/reset not supported");
}
}
package spark.compress.lzf;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
public class LZFOutputStream extends FilterOutputStream {
private static final int BLOCKSIZE = 1024 * 64 - 1;
private static final int MAX_HDR_SIZE = 7;
private byte[] inBuf; // Holds input data to be compressed
private byte[] outBuf; // Holds compressed data to be written
private int inPos; // Current position in inBuf
public LZFOutputStream(OutputStream out) {
super(out);
inBuf = new byte[BLOCKSIZE + MAX_HDR_SIZE];
outBuf = new byte[BLOCKSIZE + MAX_HDR_SIZE];
inPos = MAX_HDR_SIZE;
}
@Override
public void write(int b) throws IOException {
inBuf[inPos++] = (byte) b;
if (inPos == inBuf.length)
compressAndSendBlock();
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
if ((off | len | (off + len) | (b.length - (off + len))) < 0)
throw new IndexOutOfBoundsException();
// If we're given a large array, copy it piece by piece into inBuf and
// write one BLOCKSIZE at a time. This is done to prevent the JNI code
// from copying the whole array repeatedly if GetPrimitiveArrayCritical
// decides to copy instead of pinning.
while (inPos + len >= inBuf.length) {
int amtToCopy = inBuf.length - inPos;
System.arraycopy(b, off, inBuf, inPos, amtToCopy);
inPos += amtToCopy;
compressAndSendBlock();
off += amtToCopy;
len -= amtToCopy;
}
// Copy the remaining (incomplete) block into inBuf
System.arraycopy(b, off, inBuf, inPos, len);
inPos += len;
}
@Override
public void flush() throws IOException {
if (inPos > MAX_HDR_SIZE)
compressAndSendBlock();
out.flush();
}
// Send the data in inBuf, and reset inPos to start writing a new block.
private void compressAndSendBlock() throws IOException {
int us = inPos - MAX_HDR_SIZE;
int maxcs = us > 4 ? us - 4 : us;
int cs = LZF.compress(inBuf, MAX_HDR_SIZE, us, outBuf, MAX_HDR_SIZE, maxcs);
if (cs != 0) {
// Compression made the data smaller; use type 1 header
outBuf[0] = 'Z';
outBuf[1] = 'V';
outBuf[2] = 1;
outBuf[3] = (byte) (cs >> 8);
outBuf[4] = (byte) (cs & 0xFF);
outBuf[5] = (byte) (us >> 8);
outBuf[6] = (byte) (us & 0xFF);
out.write(outBuf, 0, 7 + cs);
} else {
// Compression didn't help; use type 0 header and uncompressed data
inBuf[2] = 'Z';
inBuf[3] = 'V';
inBuf[4] = 0;
inBuf[5] = (byte) (us >> 8);
inBuf[6] = (byte) (us & 0xFF);
out.write(inBuf, 2, 5 + us);
}
inPos = MAX_HDR_SIZE;
}
}
CC = gcc
#JAVA_HOME = /usr/lib/jvm/java-6-sun
OS_NAME = linux
CFLAGS = -fPIC -O3 -funroll-all-loops
SPARK = ../..
LZF = $(SPARK)/third_party/liblzf-3.5
LIB = libspark_native.so
all: $(LIB)
spark_compress_lzf_LZF.h: $(SPARK)/build/classes/spark/compress/lzf/LZF.class
ifeq ($(JAVA_HOME),)
$(error JAVA_HOME is not set)
else
$(JAVA_HOME)/bin/javah -classpath $(SPARK)/build/classes spark.compress.lzf.LZF
endif
$(LIB): spark_compress_lzf_LZF.h spark_compress_lzf_LZF.c
$(CC) $(CFLAGS) -shared -o $@ spark_compress_lzf_LZF.c \
-I $(JAVA_HOME)/include -I $(JAVA_HOME)/include/$(OS_NAME) \
-I $(LZF) $(LZF)/lzf_c.c $(LZF)/lzf_d.c
clean:
rm -f spark_compress_lzf_LZF.h $(LIB)
.PHONY: all clean
#include "spark_compress_lzf_LZF.h"
#include <lzf.h>
/* Helper function to throw an exception */
static void throwException(JNIEnv *env, const char* className) {
jclass cls = (*env)->FindClass(env, className);
if (cls != 0) /* If cls is null, an exception was already thrown */
(*env)->ThrowNew(env, cls, "");
}
/*
* Since LZF.compress() and LZF.decompress() have the same signatures
* and differ only in which lzf_ function they call, implement both in a
* single function and pass it a pointer to the correct lzf_ function.
*/
static jint callCompressionFunction
(unsigned int (*func)(const void *const, unsigned int, void *, unsigned int),
JNIEnv *env, jclass cls, jbyteArray inArray, jint inOff, jint inLen,
jbyteArray outArray, jint outOff, jint outLen)
{
jint inCap;
jint outCap;
jbyte *inData = 0;
jbyte *outData = 0;
jint ret;
jint s;
if (!inArray || !outArray) {
throwException(env, "java/lang/NullPointerException");
goto cleanup;
}
inCap = (*env)->GetArrayLength(env, inArray);
outCap = (*env)->GetArrayLength(env, outArray);
// Check if any of the offset/length pairs is invalid; we do this by OR'ing
// things we don't want to be negative and seeing if the result is negative
s = inOff | inLen | (inOff + inLen) | (inCap - (inOff + inLen)) |
outOff | outLen | (outOff + outLen) | (outCap - (outOff + outLen));
if (s < 0) {
throwException(env, "java/lang/IndexOutOfBoundsException");
goto cleanup;
}
inData = (*env)->GetPrimitiveArrayCritical(env, inArray, 0);
outData = (*env)->GetPrimitiveArrayCritical(env, outArray, 0);
if (!inData || !outData) {
// Out of memory - JVM will throw OutOfMemoryError
goto cleanup;
}
ret = func(inData + inOff, inLen, outData + outOff, outLen);
cleanup:
if (inData)
(*env)->ReleasePrimitiveArrayCritical(env, inArray, inData, 0);
if (outData)
(*env)->ReleasePrimitiveArrayCritical(env, outArray, outData, 0);
return ret;
}
/*
* Class: spark_compress_lzf_LZF
* Method: compress
* Signature: ([B[B)I
*/
JNIEXPORT jint JNICALL Java_spark_compress_lzf_LZF_compress
(JNIEnv *env, jclass cls, jbyteArray inArray, jint inOff, jint inLen,
jbyteArray outArray, jint outOff, jint outLen)
{
return callCompressionFunction(lzf_compress, env, cls,
inArray, inOff, inLen, outArray,outOff, outLen);
}
/*
* Class: spark_compress_lzf_LZF
* Method: decompress
* Signature: ([B[B)I
*/
JNIEXPORT jint JNICALL Java_spark_compress_lzf_LZF_decompress
(JNIEnv *env, jclass cls, jbyteArray inArray, jint inOff, jint inLen,
jbyteArray outArray, jint outOff, jint outLen)
{
return callCompressionFunction(lzf_decompress, env, cls,
inArray, inOff, inLen, outArray,outOff, outLen);
}
......@@ -14,7 +14,7 @@ import scala.collection.mutable.Map
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem}
import spark.compress.lzf.{LZFInputStream, LZFOutputStream}
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
@serializable
trait BroadcastRecipe {
......
File added
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment