Skip to content
Snippets Groups Projects
Commit 2b027e9a authored by Cheng Lian's avatar Cheng Lian
Browse files

[SPARK-12818] Polishes spark-sketch module

Fixes various minor code and Javadoc styling issues.

Author: Cheng Lian <lian@databricks.com>

Closes #10985 from liancheng/sketch-polishing.
parent 5f686cc8
No related branches found
No related tags found
No related merge requests found
......@@ -22,7 +22,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
public final class BitArray {
final class BitArray {
private final long[] data;
private long bitCount;
......
......@@ -22,16 +22,10 @@ import java.io.InputStream;
import java.io.OutputStream;
/**
* A Bloom filter is a space-efficient probabilistic data structure, that is used to test whether
* an element is a member of a set. It returns false when the element is definitely not in the
* set, returns true when the element is probably in the set.
*
* Internally a Bloom filter is initialized with 2 information: how many space to use(number of
* bits) and how many hash values to calculate for each record. To get as lower false positive
* probability as possible, user should call {@link BloomFilter#create} to automatically pick a
* best combination of these 2 parameters.
*
* Currently the following data types are supported:
* A Bloom filter is a space-efficient probabilistic data structure that offers an approximate
* containment test with one-sided error: if it claims that an item is contained in it, this
* might be in error, but if it claims that an item is <i>not</i> contained in it, then this is
* definitely true. Currently supported data types include:
* <ul>
* <li>{@link Byte}</li>
* <li>{@link Short}</li>
......@@ -39,14 +33,17 @@ import java.io.OutputStream;
* <li>{@link Long}</li>
* <li>{@link String}</li>
* </ul>
* The false positive probability ({@code FPP}) of a Bloom filter is defined as the probability that
* {@linkplain #mightContain(Object)} will erroneously return {@code true} for an object that hasu
* not actually been put in the {@code BloomFilter}.
*
* The implementation is largely based on the {@code BloomFilter} class from guava.
* The implementation is largely based on the {@code BloomFilter} class from Guava.
*/
public abstract class BloomFilter {
public enum Version {
/**
* {@code BloomFilter} binary format version 1 (all values written in big-endian order):
* {@code BloomFilter} binary format version 1. All values written in big-endian order:
* <ul>
* <li>Version number, always 1 (32 bit)</li>
* <li>Number of hash functions (32 bit)</li>
......@@ -68,14 +65,13 @@ public abstract class BloomFilter {
}
/**
* Returns the false positive probability, i.e. the probability that
* {@linkplain #mightContain(Object)} will erroneously return {@code true} for an object that
* has not actually been put in the {@code BloomFilter}.
* Returns the probability that {@linkplain #mightContain(Object)} erroneously return {@code true}
* for an object that has not actually been put in the {@code BloomFilter}.
*
* <p>Ideally, this number should be close to the {@code fpp} parameter
* passed in to create this bloom filter, or smaller. If it is
* significantly higher, it is usually the case that too many elements (more than
* expected) have been put in the {@code BloomFilter}, degenerating it.
* Ideally, this number should be close to the {@code fpp} parameter passed in
* {@linkplain #create(long, double)}, or smaller. If it is significantly higher, it is usually
* the case that too many items (more than expected) have been put in the {@code BloomFilter},
* degenerating it.
*/
public abstract double expectedFpp();
......@@ -85,8 +81,8 @@ public abstract class BloomFilter {
public abstract long bitSize();
/**
* Puts an element into this {@code BloomFilter}. Ensures that subsequent invocations of
* {@link #mightContain(Object)} with the same element will always return {@code true}.
* Puts an item into this {@code BloomFilter}. Ensures that subsequent invocations of
* {@linkplain #mightContain(Object)} with the same item will always return {@code true}.
*
* @return true if the bloom filter's bits changed as a result of this operation. If the bits
* changed, this is <i>definitely</i> the first time {@code object} has been added to the
......@@ -98,19 +94,19 @@ public abstract class BloomFilter {
public abstract boolean put(Object item);
/**
* A specialized variant of {@link #put(Object)}, that can only be used to put utf-8 string.
* A specialized variant of {@link #put(Object)} that only supports {@code String} items.
*/
public abstract boolean putString(String str);
public abstract boolean putString(String item);
/**
* A specialized variant of {@link #put(Object)}, that can only be used to put long.
* A specialized variant of {@link #put(Object)} that only supports {@code long} items.
*/
public abstract boolean putLong(long l);
public abstract boolean putLong(long item);
/**
* A specialized variant of {@link #put(Object)}, that can only be used to put byte array.
* A specialized variant of {@link #put(Object)} that only supports byte array items.
*/
public abstract boolean putBinary(byte[] bytes);
public abstract boolean putBinary(byte[] item);
/**
* Determines whether a given bloom filter is compatible with this bloom filter. For two
......@@ -137,38 +133,36 @@ public abstract class BloomFilter {
public abstract boolean mightContain(Object item);
/**
* A specialized variant of {@link #mightContain(Object)}, that can only be used to test utf-8
* string.
* A specialized variant of {@link #mightContain(Object)} that only tests {@code String} items.
*/
public abstract boolean mightContainString(String str);
public abstract boolean mightContainString(String item);
/**
* A specialized variant of {@link #mightContain(Object)}, that can only be used to test long.
* A specialized variant of {@link #mightContain(Object)} that only tests {@code long} items.
*/
public abstract boolean mightContainLong(long l);
public abstract boolean mightContainLong(long item);
/**
* A specialized variant of {@link #mightContain(Object)}, that can only be used to test byte
* array.
* A specialized variant of {@link #mightContain(Object)} that only tests byte array items.
*/
public abstract boolean mightContainBinary(byte[] bytes);
public abstract boolean mightContainBinary(byte[] item);
/**
* Writes out this {@link BloomFilter} to an output stream in binary format.
* It is the caller's responsibility to close the stream.
* Writes out this {@link BloomFilter} to an output stream in binary format. It is the caller's
* responsibility to close the stream.
*/
public abstract void writeTo(OutputStream out) throws IOException;
/**
* Reads in a {@link BloomFilter} from an input stream.
* It is the caller's responsibility to close the stream.
* Reads in a {@link BloomFilter} from an input stream. It is the caller's responsibility to close
* the stream.
*/
public static BloomFilter readFrom(InputStream in) throws IOException {
return BloomFilterImpl.readFrom(in);
}
/**
* Computes the optimal k (number of hashes per element inserted in Bloom filter), given the
* Computes the optimal k (number of hashes per item inserted in Bloom filter), given the
* expected insertions and total number of bits in the Bloom filter.
*
* See http://en.wikipedia.org/wiki/File:Bloom_filter_fp_probability.svg for the formula.
......@@ -197,21 +191,31 @@ public abstract class BloomFilter {
static final double DEFAULT_FPP = 0.03;
/**
* Creates a {@link BloomFilter} with given {@code expectedNumItems} and the default {@code fpp}.
* Creates a {@link BloomFilter} with the expected number of insertions and a default expected
* false positive probability of 3%.
*
* Note that overflowing a {@code BloomFilter} with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*/
public static BloomFilter create(long expectedNumItems) {
return create(expectedNumItems, DEFAULT_FPP);
}
/**
* Creates a {@link BloomFilter} with given {@code expectedNumItems} and {@code fpp}, it will pick
* an optimal {@code numBits} and {@code numHashFunctions} for the bloom filter.
* Creates a {@link BloomFilter} with the expected number of insertions and expected false
* positive probability.
*
* Note that overflowing a {@code BloomFilter} with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*/
public static BloomFilter create(long expectedNumItems, double fpp) {
assert fpp > 0.0 : "False positive probability must be > 0.0";
assert fpp < 1.0 : "False positive probability must be < 1.0";
long numBits = optimalNumOfBits(expectedNumItems, fpp);
return create(expectedNumItems, numBits);
if (fpp <= 0D || fpp >= 1D) {
throw new IllegalArgumentException(
"False positive probability must be within range (0.0, 1.0)"
);
}
return create(expectedNumItems, optimalNumOfBits(expectedNumItems, fpp));
}
/**
......@@ -219,9 +223,14 @@ public abstract class BloomFilter {
* pick an optimal {@code numHashFunctions} which can minimize {@code fpp} for the bloom filter.
*/
public static BloomFilter create(long expectedNumItems, long numBits) {
assert expectedNumItems > 0 : "Expected insertions must be > 0";
assert numBits > 0 : "number of bits must be > 0";
int numHashFunctions = optimalNumOfHashFunctions(expectedNumItems, numBits);
return new BloomFilterImpl(numHashFunctions, numBits);
if (expectedNumItems <= 0) {
throw new IllegalArgumentException("Expected insertions must be positive");
}
if (numBits <= 0) {
throw new IllegalArgumentException("Number of bits must be positive");
}
return new BloomFilterImpl(optimalNumOfHashFunctions(expectedNumItems, numBits), numBits);
}
}
......@@ -19,9 +19,10 @@ package org.apache.spark.util.sketch;
import java.io.*;
public class BloomFilterImpl extends BloomFilter implements Serializable {
class BloomFilterImpl extends BloomFilter implements Serializable {
private int numHashFunctions;
private BitArray bits;
BloomFilterImpl(int numHashFunctions, long numBits) {
......@@ -77,14 +78,14 @@ public class BloomFilterImpl extends BloomFilter implements Serializable {
}
@Override
public boolean putString(String str) {
return putBinary(Utils.getBytesFromUTF8String(str));
public boolean putString(String item) {
return putBinary(Utils.getBytesFromUTF8String(item));
}
@Override
public boolean putBinary(byte[] bytes) {
int h1 = Murmur3_x86_32.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, 0);
int h2 = Murmur3_x86_32.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, h1);
public boolean putBinary(byte[] item) {
int h1 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, 0);
int h2 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, h1);
long bitSize = bits.bitSize();
boolean bitsChanged = false;
......@@ -100,14 +101,14 @@ public class BloomFilterImpl extends BloomFilter implements Serializable {
}
@Override
public boolean mightContainString(String str) {
return mightContainBinary(Utils.getBytesFromUTF8String(str));
public boolean mightContainString(String item) {
return mightContainBinary(Utils.getBytesFromUTF8String(item));
}
@Override
public boolean mightContainBinary(byte[] bytes) {
int h1 = Murmur3_x86_32.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, 0);
int h2 = Murmur3_x86_32.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, h1);
public boolean mightContainBinary(byte[] item) {
int h1 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, 0);
int h2 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, h1);
long bitSize = bits.bitSize();
for (int i = 1; i <= numHashFunctions; i++) {
......@@ -124,14 +125,14 @@ public class BloomFilterImpl extends BloomFilter implements Serializable {
}
@Override
public boolean putLong(long l) {
public boolean putLong(long item) {
// Here we first hash the input long element into 2 int hash values, h1 and h2, then produce n
// hash values by `h1 + i * h2` with 1 <= i <= numHashFunctions.
// Note that `CountMinSketch` use a different strategy, it hash the input long element with
// every i to produce n hash values.
// TODO: the strategy of `CountMinSketch` looks more advanced, should we follow it here?
int h1 = Murmur3_x86_32.hashLong(l, 0);
int h2 = Murmur3_x86_32.hashLong(l, h1);
int h1 = Murmur3_x86_32.hashLong(item, 0);
int h2 = Murmur3_x86_32.hashLong(item, h1);
long bitSize = bits.bitSize();
boolean bitsChanged = false;
......@@ -147,9 +148,9 @@ public class BloomFilterImpl extends BloomFilter implements Serializable {
}
@Override
public boolean mightContainLong(long l) {
int h1 = Murmur3_x86_32.hashLong(l, 0);
int h2 = Murmur3_x86_32.hashLong(l, h1);
public boolean mightContainLong(long item) {
int h1 = Murmur3_x86_32.hashLong(item, 0);
int h2 = Murmur3_x86_32.hashLong(item, h1);
long bitSize = bits.bitSize();
for (int i = 1; i <= numHashFunctions; i++) {
......@@ -197,7 +198,7 @@ public class BloomFilterImpl extends BloomFilter implements Serializable {
throw new IncompatibleMergeException("Cannot merge null bloom filter");
}
if (!(other instanceof BloomFilter)) {
if (!(other instanceof BloomFilterImpl)) {
throw new IncompatibleMergeException(
"Cannot merge bloom filter of class " + other.getClass().getName()
);
......@@ -211,7 +212,8 @@ public class BloomFilterImpl extends BloomFilter implements Serializable {
if (this.numHashFunctions != that.numHashFunctions) {
throw new IncompatibleMergeException(
"Cannot merge bloom filters with different number of hash functions");
"Cannot merge bloom filters with different number of hash functions"
);
}
this.bits.putAll(that.bits);
......
......@@ -22,7 +22,7 @@ import java.io.InputStream;
import java.io.OutputStream;
/**
* A Count-Min sketch is a probabilistic data structure used for summarizing streams of data in
* A Count-min sketch is a probabilistic data structure used for summarizing streams of data in
* sub-linear space. Currently, supported data types include:
* <ul>
* <li>{@link Byte}</li>
......@@ -31,8 +31,7 @@ import java.io.OutputStream;
* <li>{@link Long}</li>
* <li>{@link String}</li>
* </ul>
* Each {@link CountMinSketch} is initialized with a random seed, and a pair
* of parameters:
* A {@link CountMinSketch} is initialized with a random seed, and a pair of parameters:
* <ol>
* <li>relative error (or {@code eps}), and
* <li>confidence (or {@code delta})
......@@ -49,16 +48,13 @@ import java.io.OutputStream;
* <li>{@code w = ceil(-log(1 - confidence) / log(2))}</li>
* </ul>
*
* See http://www.eecs.harvard.edu/~michaelm/CS222/countmin.pdf for technical details,
* including proofs of the estimates and error bounds used in this implementation.
*
* This implementation is largely based on the {@code CountMinSketch} class from stream-lib.
*/
abstract public class CountMinSketch {
public enum Version {
/**
* {@code CountMinSketch} binary format version 1 (all values written in big-endian order):
* {@code CountMinSketch} binary format version 1. All values written in big-endian order:
* <ul>
* <li>Version number, always 1 (32 bit)</li>
* <li>Total count of added items (64 bit)</li>
......@@ -172,14 +168,14 @@ abstract public class CountMinSketch {
throws IncompatibleMergeException;
/**
* Writes out this {@link CountMinSketch} to an output stream in binary format.
* It is the caller's responsibility to close the stream.
* Writes out this {@link CountMinSketch} to an output stream in binary format. It is the caller's
* responsibility to close the stream.
*/
public abstract void writeTo(OutputStream out) throws IOException;
/**
* Reads in a {@link CountMinSketch} from an input stream.
* It is the caller's responsibility to close the stream.
* Reads in a {@link CountMinSketch} from an input stream. It is the caller's responsibility to
* close the stream.
*/
public static CountMinSketch readFrom(InputStream in) throws IOException {
return CountMinSketchImpl.readFrom(in);
......@@ -188,6 +184,10 @@ abstract public class CountMinSketch {
/**
* Creates a {@link CountMinSketch} with given {@code depth}, {@code width}, and random
* {@code seed}.
*
* @param depth depth of the Count-min Sketch, must be positive
* @param width width of the Count-min Sketch, must be positive
* @param seed random seed
*/
public static CountMinSketch create(int depth, int width, int seed) {
return new CountMinSketchImpl(depth, width, seed);
......@@ -196,6 +196,10 @@ abstract public class CountMinSketch {
/**
* Creates a {@link CountMinSketch} with given relative error ({@code eps}), {@code confidence},
* and random {@code seed}.
*
* @param eps relative error, must be positive
* @param confidence confidence, must be positive and less than 1.0
* @param seed random seed
*/
public static CountMinSketch create(double eps, double confidence, int seed) {
return new CountMinSketchImpl(eps, confidence, seed);
......
......@@ -42,6 +42,10 @@ class CountMinSketchImpl extends CountMinSketch implements Serializable {
private CountMinSketchImpl() {}
CountMinSketchImpl(int depth, int width, int seed) {
if (depth <= 0 || width <= 0) {
throw new IllegalArgumentException("Depth and width must be both positive");
}
this.depth = depth;
this.width = width;
this.eps = 2.0 / width;
......@@ -50,6 +54,14 @@ class CountMinSketchImpl extends CountMinSketch implements Serializable {
}
CountMinSketchImpl(double eps, double confidence, int seed) {
if (eps <= 0D) {
throw new IllegalArgumentException("Relative error must be positive");
}
if (confidence <= 0D || confidence >= 1D) {
throw new IllegalArgumentException("Confidence must be within range (0.0, 1.0)");
}
// 2/w = eps ; w = 2/eps
// 1/2^depth <= 1-confidence ; depth >= -log2 (1-confidence)
this.eps = eps;
......
......@@ -19,7 +19,7 @@ package org.apache.spark.util.sketch;
import java.io.UnsupportedEncodingException;
public class Utils {
class Utils {
public static byte[] getBytesFromUTF8String(String str) {
try {
return str.getBytes("utf-8");
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment