diff --git a/common/kvstore/pom.xml b/common/kvstore/pom.xml
index d00cf2788b9645be93bc72e00efe8ae396bd521e..cf93d41cd77cfef40b8e8ef644667aa5d008b93f 100644
--- a/common/kvstore/pom.xml
+++ b/common/kvstore/pom.xml
@@ -35,6 +35,11 @@
   </properties>
 
   <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-tags_${scala.binary.version}</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/ArrayWrappers.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/ArrayWrappers.java
new file mode 100644
index 0000000000000000000000000000000000000000..5efa842e3aad762719b8aeb0d2ff145513fa913e
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/ArrayWrappers.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.Arrays;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A factory for array wrappers so that arrays can be used as keys in a map, sorted or not.
+ *
+ * The comparator implementation makes two assumptions:
+ * - All elements are instances of Comparable
+ * - When comparing two arrays, they both contain elements of the same type in corresponding
+ *   indices.
+ *
+ * Otherwise, ClassCastExceptions may occur. The equality method can compare any two arrays.
+ *
+ * This class is not efficient and is mostly meant to compare really small arrays, like those
+ * generally used as indices and keys in a KVStore.
+ */
+class ArrayWrappers {
+
+  @SuppressWarnings("unchecked")
+  public static Comparable<Object> forArray(Object a) {
+    Preconditions.checkArgument(a.getClass().isArray());
+    Comparable<?> ret;
+    if (a instanceof int[]) {
+      ret = new ComparableIntArray((int[]) a);
+    } else if (a instanceof long[]) {
+      ret = new ComparableLongArray((long[]) a);
+    } else if (a instanceof byte[]) {
+      ret = new ComparableByteArray((byte[]) a);
+    } else {
+      Preconditions.checkArgument(!a.getClass().getComponentType().isPrimitive());
+      ret = new ComparableObjectArray((Object[]) a);
+    }
+    return (Comparable<Object>) ret;
+  }
+
+  private static class ComparableIntArray implements Comparable<ComparableIntArray> {
+
+    private final int[] array;
+
+    ComparableIntArray(int[] array) {
+      this.array = array;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (!(other instanceof ComparableIntArray)) {
+        return false;
+      }
+      return Arrays.equals(array, ((ComparableIntArray) other).array);
+    }
+
+    @Override
+    public int hashCode() {
+      int code = 0;
+      for (int i = 0; i < array.length; i++) {
+        code = (code * 31) + array[i];
+      }
+      return code;
+    }
+
+    @Override
+    public int compareTo(ComparableIntArray other) {
+      int len = Math.min(array.length, other.array.length);
+      for (int i = 0; i < len; i++) {
+        int diff = array[i] - other.array[i];
+        if (diff != 0) {
+          return diff;
+        }
+      }
+
+      return array.length - other.array.length;
+    }
+  }
+
+  private static class ComparableLongArray implements Comparable<ComparableLongArray> {
+
+    private final long[] array;
+
+    ComparableLongArray(long[] array) {
+      this.array = array;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (!(other instanceof ComparableLongArray)) {
+        return false;
+      }
+      return Arrays.equals(array, ((ComparableLongArray) other).array);
+    }
+
+    @Override
+    public int hashCode() {
+      int code = 0;
+      for (int i = 0; i < array.length; i++) {
+        code = (code * 31) + (int) array[i];
+      }
+      return code;
+    }
+
+    @Override
+    public int compareTo(ComparableLongArray other) {
+      int len = Math.min(array.length, other.array.length);
+      for (int i = 0; i < len; i++) {
+        long diff = array[i] - other.array[i];
+        if (diff != 0) {
+          return diff > 0 ? 1 : -1;
+        }
+      }
+
+      return array.length - other.array.length;
+    }
+  }
+
+  private static class ComparableByteArray implements Comparable<ComparableByteArray> {
+
+    private final byte[] array;
+
+    ComparableByteArray(byte[] array) {
+      this.array = array;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (!(other instanceof ComparableByteArray)) {
+        return false;
+      }
+      return Arrays.equals(array, ((ComparableByteArray) other).array);
+    }
+
+    @Override
+    public int hashCode() {
+      int code = 0;
+      for (int i = 0; i < array.length; i++) {
+        code = (code * 31) + array[i];
+      }
+      return code;
+    }
+
+    @Override
+    public int compareTo(ComparableByteArray other) {
+      int len = Math.min(array.length, other.array.length);
+      for (int i = 0; i < len; i++) {
+        int diff = array[i] - other.array[i];
+        if (diff != 0) {
+          return diff;
+        }
+      }
+
+      return array.length - other.array.length;
+    }
+  }
+
+  private static class ComparableObjectArray implements Comparable<ComparableObjectArray> {
+
+    private final Object[] array;
+
+    ComparableObjectArray(Object[] array) {
+      this.array = array;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (!(other instanceof ComparableObjectArray)) {
+        return false;
+      }
+      return Arrays.equals(array, ((ComparableObjectArray) other).array);
+    }
+
+    @Override
+    public int hashCode() {
+      int code = 0;
+      for (int i = 0; i < array.length; i++) {
+        code = (code * 31) + array[i].hashCode();
+      }
+      return code;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public int compareTo(ComparableObjectArray other) {
+      int len = Math.min(array.length, other.array.length);
+      for (int i = 0; i < len; i++) {
+        int diff = ((Comparable<Object>) array[i]).compareTo((Comparable<Object>) other.array[i]);
+        if (diff != 0) {
+          return diff;
+        }
+      }
+
+      return array.length - other.array.length;
+    }
+  }
+
+}
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/InMemoryStore.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/InMemoryStore.java
new file mode 100644
index 0000000000000000000000000000000000000000..f3aeb822e72cd2c9369a5616e35b8d4f67ce977f
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/InMemoryStore.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+import org.apache.spark.annotation.Private;
+
+/**
+ * Implementation of KVStore that keeps data deserialized in memory. This store does not index
+ * data; instead, whenever iterating over an indexed field, the stored data is copied and sorted
+ * according to the index. This saves memory but makes iteration more expensive.
+ */
+@Private
+public class InMemoryStore implements KVStore {
+
+  private Object metadata;
+  private ConcurrentMap<Class<?>, InstanceList> data = new ConcurrentHashMap<>();
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) {
+    return klass.cast(metadata);
+  }
+
+  @Override
+  public void setMetadata(Object value) {
+    this.metadata = value;
+  }
+
+  @Override
+  public long count(Class<?> type) {
+    InstanceList list = data.get(type);
+    return list != null ? list.size() : 0;
+  }
+
+  @Override
+  public long count(Class<?> type, String index, Object indexedValue) throws Exception {
+    InstanceList list = data.get(type);
+    int count = 0;
+    Object comparable = asKey(indexedValue);
+    KVTypeInfo.Accessor accessor = list.getIndexAccessor(index);
+    for (Object o : view(type)) {
+      if (Objects.equal(comparable, asKey(accessor.get(o)))) {
+        count++;
+      }
+    }
+    return count;
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) {
+    InstanceList list = data.get(klass);
+    Object value = list != null ? list.get(naturalKey) : null;
+    if (value == null) {
+      throw new NoSuchElementException();
+    }
+    return klass.cast(value);
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    InstanceList list = data.computeIfAbsent(value.getClass(), key -> {
+      try {
+        return new InstanceList(key);
+      } catch (Exception e) {
+        throw Throwables.propagate(e);
+      }
+    });
+    list.put(value);
+  }
+
+  @Override
+  public void delete(Class<?> type, Object naturalKey) {
+    InstanceList list = data.get(type);
+    if (list != null) {
+      list.delete(naturalKey);
+    }
+  }
+
+  @Override
+  public <T> KVStoreView<T> view(Class<T> type){
+    InstanceList list = data.get(type);
+    return list != null ? list.view(type)
+      : new InMemoryView<>(type, Collections.<T>emptyList(), null);
+  }
+
+  @Override
+  public void close() {
+    metadata = null;
+    data.clear();
+  }
+
+  @SuppressWarnings("unchecked")
+  private static Comparable<Object> asKey(Object in) {
+    if (in.getClass().isArray()) {
+      in = ArrayWrappers.forArray(in);
+    }
+    return (Comparable<Object>) in;
+  }
+
+  private static class InstanceList {
+
+    private final KVTypeInfo ti;
+    private final KVTypeInfo.Accessor naturalKey;
+    private final ConcurrentMap<Comparable<Object>, Object> data;
+
+    private int size;
+
+    private InstanceList(Class<?> type) throws Exception {
+      this.ti = new KVTypeInfo(type);
+      this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
+      this.data = new ConcurrentHashMap<>();
+      this.size = 0;
+    }
+
+    KVTypeInfo.Accessor getIndexAccessor(String indexName) {
+      return ti.getAccessor(indexName);
+    }
+
+    public Object get(Object key) {
+      return data.get(asKey(key));
+    }
+
+    public void put(Object value) throws Exception {
+      Preconditions.checkArgument(ti.type().equals(value.getClass()),
+        "Unexpected type: %s", value.getClass());
+      if (data.put(asKey(naturalKey.get(value)), value) == null) {
+        size++;
+      }
+    }
+
+    public void delete(Object key) {
+      if (data.remove(asKey(key)) != null) {
+        size--;
+      }
+    }
+
+    public int size() {
+      return size;
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> InMemoryView<T> view(Class<T> type) {
+      Preconditions.checkArgument(ti.type().equals(type), "Unexpected type: %s", type);
+      Collection<T> all = (Collection<T>) data.values();
+      return new InMemoryView(type, all, ti);
+    }
+
+  }
+
+  private static class InMemoryView<T> extends KVStoreView<T> {
+
+    private final Collection<T> elements;
+    private final KVTypeInfo ti;
+    private final KVTypeInfo.Accessor natural;
+
+    InMemoryView(Class<T> type, Collection<T> elements, KVTypeInfo ti) {
+      super(type);
+      this.elements = elements;
+      this.ti = ti;
+      this.natural = ti != null ? ti.getAccessor(KVIndex.NATURAL_INDEX_NAME) : null;
+    }
+
+    @Override
+    public Iterator<T> iterator() {
+      if (elements.isEmpty()) {
+        return new InMemoryIterator<>(elements.iterator());
+      }
+
+      try {
+        KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : null;
+        int modifier = ascending ? 1 : -1;
+
+        final List<T> sorted = copyElements();
+        Collections.sort(sorted, (e1, e2) -> modifier * compare(e1, e2, getter));
+        Stream<T> stream = sorted.stream();
+
+        if (first != null) {
+          stream = stream.filter(e -> modifier * compare(e, getter, first) >= 0);
+        }
+
+        if (last != null) {
+          stream = stream.filter(e -> modifier * compare(e, getter, last) <= 0);
+        }
+
+        if (skip > 0) {
+          stream = stream.skip(skip);
+        }
+
+        if (max < sorted.size()) {
+          stream = stream.limit((int) max);
+        }
+
+        return new InMemoryIterator<>(stream.iterator());
+      } catch (Exception e) {
+        throw Throwables.propagate(e);
+      }
+    }
+
+    /**
+     * Create a copy of the input elements, filtering the values for child indices if needed.
+     */
+    private List<T> copyElements() {
+      if (parent != null) {
+        KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index);
+        Preconditions.checkArgument(parentGetter != null, "Parent filter for non-child index.");
+
+        return elements.stream()
+          .filter(e -> compare(e, parentGetter, parent) == 0)
+          .collect(Collectors.toList());
+      } else {
+        return new ArrayList<>(elements);
+      }
+    }
+
+    private int compare(T e1, T e2, KVTypeInfo.Accessor getter) {
+      try {
+        int diff = compare(e1, getter, getter.get(e2));
+        if (diff == 0 && getter != natural) {
+          diff = compare(e1, natural, natural.get(e2));
+        }
+        return diff;
+      } catch (Exception e) {
+        throw Throwables.propagate(e);
+      }
+    }
+
+    private int compare(T e1, KVTypeInfo.Accessor getter, Object v2) {
+      try {
+        return asKey(getter.get(e1)).compareTo(asKey(v2));
+      } catch (Exception e) {
+        throw Throwables.propagate(e);
+      }
+    }
+
+  }
+
+  private static class InMemoryIterator<T> implements KVStoreIterator<T> {
+
+    private final Iterator<T> iter;
+
+    InMemoryIterator(Iterator<T> iter) {
+      this.iter = iter;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return iter.hasNext();
+    }
+
+    @Override
+    public T next() {
+      return iter.next();
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<T> next(int max) {
+      List<T> list = new ArrayList<>(max);
+      while (hasNext() && list.size() < max) {
+        list.add(next());
+      }
+      return list;
+    }
+
+    @Override
+    public boolean skip(long n) {
+      long skipped = 0;
+      while (skipped < n) {
+        if (hasNext()) {
+          next();
+          skipped++;
+        } else {
+          return false;
+        }
+      }
+
+      return hasNext();
+    }
+
+    @Override
+    public void close() {
+      // no op.
+    }
+
+  }
+
+}
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java
index e1cc0ba3f5aa73e069ba318e98b8ec3476d11b87..e3e61e0a0e4529b2bcc97d8942fb3d19726d469a 100644
--- a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java
@@ -89,7 +89,7 @@ public class KVTypeInfo {
       "Duplicate index %s for type %s.", idx.value(), type.getName());
   }
 
-  public Class<?> getType() {
+  public Class<?> type() {
     return type;
   }
 
diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayKeyIndexType.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayKeyIndexType.java
new file mode 100644
index 0000000000000000000000000000000000000000..d5938acc3e80eb10a18df5ccb26e700490674291
--- /dev/null
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayKeyIndexType.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.Arrays;
+
+public class ArrayKeyIndexType {
+
+  @KVIndex
+  public int[] key;
+
+  @KVIndex("id")
+  public String[] id;
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof ArrayKeyIndexType) {
+      ArrayKeyIndexType other = (ArrayKeyIndexType) o;
+      return Arrays.equals(key, other.key) && Arrays.equals(id, other.id);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return key.hashCode();
+  }
+
+}
diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayWrappersSuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayWrappersSuite.java
new file mode 100644
index 0000000000000000000000000000000000000000..f9b4774820ea008b4e13f7e1bc7d02c03a2759fe
--- /dev/null
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayWrappersSuite.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class ArrayWrappersSuite {
+
+  @Test
+  public void testGenericArrayKey() {
+   byte[] b1 = new byte[] { 0x01, 0x02, 0x03 };
+   byte[] b2 = new byte[] { 0x01, 0x02 };
+   int[] i1 = new int[] { 1, 2, 3 };
+   int[] i2 = new int[] { 1, 2 };
+   String[] s1 = new String[] { "1", "2", "3" };
+   String[] s2 = new String[] { "1", "2" };
+
+   assertEquals(ArrayWrappers.forArray(b1), ArrayWrappers.forArray(b1));
+   assertNotEquals(ArrayWrappers.forArray(b1), ArrayWrappers.forArray(b2));
+   assertNotEquals(ArrayWrappers.forArray(b1), ArrayWrappers.forArray(i1));
+   assertNotEquals(ArrayWrappers.forArray(b1), ArrayWrappers.forArray(s1));
+
+   assertEquals(ArrayWrappers.forArray(i1), ArrayWrappers.forArray(i1));
+   assertNotEquals(ArrayWrappers.forArray(i1), ArrayWrappers.forArray(i2));
+   assertNotEquals(ArrayWrappers.forArray(i1), ArrayWrappers.forArray(b1));
+   assertNotEquals(ArrayWrappers.forArray(i1), ArrayWrappers.forArray(s1));
+
+   assertEquals(ArrayWrappers.forArray(s1), ArrayWrappers.forArray(s1));
+   assertNotEquals(ArrayWrappers.forArray(s1), ArrayWrappers.forArray(s2));
+   assertNotEquals(ArrayWrappers.forArray(s1), ArrayWrappers.forArray(b1));
+   assertNotEquals(ArrayWrappers.forArray(s1), ArrayWrappers.forArray(i1));
+
+   assertEquals(0, ArrayWrappers.forArray(b1).compareTo(ArrayWrappers.forArray(b1)));
+   assertTrue(ArrayWrappers.forArray(b1).compareTo(ArrayWrappers.forArray(b2)) > 0);
+
+   assertEquals(0, ArrayWrappers.forArray(i1).compareTo(ArrayWrappers.forArray(i1)));
+   assertTrue(ArrayWrappers.forArray(i1).compareTo(ArrayWrappers.forArray(i2)) > 0);
+
+   assertEquals(0, ArrayWrappers.forArray(s1).compareTo(ArrayWrappers.forArray(s1)));
+   assertTrue(ArrayWrappers.forArray(s1).compareTo(ArrayWrappers.forArray(s2)) > 0);
+  }
+
+}
diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryIteratorSuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryIteratorSuite.java
new file mode 100644
index 0000000000000000000000000000000000000000..57ee4f6dd7cb61a3fd9119a15db3a64151f9ae4f
--- /dev/null
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryIteratorSuite.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+public class InMemoryIteratorSuite extends DBIteratorSuite {
+
+  @Override
+  protected KVStore createStore() {
+    return new InMemoryStore();
+  }
+
+}
diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryStoreSuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryStoreSuite.java
new file mode 100644
index 0000000000000000000000000000000000000000..6a7915f9385e096aa356c08fd29f8ef58c7241ce
--- /dev/null
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryStoreSuite.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.NoSuchElementException;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class InMemoryStoreSuite {
+
+  @Test
+  public void testObjectWriteReadDelete() throws Exception {
+    KVStore store = new InMemoryStore();
+
+    CustomType1 t = new CustomType1();
+    t.key = "key";
+    t.id = "id";
+    t.name = "name";
+
+    try {
+      store.read(CustomType1.class, t.key);
+      fail("Expected exception for non-existant object.");
+    } catch (NoSuchElementException nsee) {
+      // Expected.
+    }
+
+    store.write(t);
+    assertEquals(t, store.read(t.getClass(), t.key));
+    assertEquals(1L, store.count(t.getClass()));
+
+    store.delete(t.getClass(), t.key);
+    try {
+      store.read(t.getClass(), t.key);
+      fail("Expected exception for deleted object.");
+    } catch (NoSuchElementException nsee) {
+      // Expected.
+    }
+  }
+
+  @Test
+  public void testMultipleObjectWriteReadDelete() throws Exception {
+    KVStore store = new InMemoryStore();
+
+    CustomType1 t1 = new CustomType1();
+    t1.key = "key1";
+    t1.id = "id";
+    t1.name = "name1";
+
+    CustomType1 t2 = new CustomType1();
+    t2.key = "key2";
+    t2.id = "id";
+    t2.name = "name2";
+
+    store.write(t1);
+    store.write(t2);
+
+    assertEquals(t1, store.read(t1.getClass(), t1.key));
+    assertEquals(t2, store.read(t2.getClass(), t2.key));
+    assertEquals(2L, store.count(t1.getClass()));
+
+    store.delete(t1.getClass(), t1.key);
+    assertEquals(t2, store.read(t2.getClass(), t2.key));
+    store.delete(t2.getClass(), t2.key);
+    try {
+      store.read(t2.getClass(), t2.key);
+      fail("Expected exception for deleted object.");
+    } catch (NoSuchElementException nsee) {
+      // Expected.
+    }
+  }
+
+  @Test
+  public void testMetadata() throws Exception {
+    KVStore store = new InMemoryStore();
+    assertNull(store.getMetadata(CustomType1.class));
+
+    CustomType1 t = new CustomType1();
+    t.id = "id";
+    t.name = "name";
+
+    store.setMetadata(t);
+    assertEquals(t, store.getMetadata(CustomType1.class));
+
+    store.setMetadata(null);
+    assertNull(store.getMetadata(CustomType1.class));
+  }
+
+  @Test
+  public void testUpdate() throws Exception {
+    KVStore store = new InMemoryStore();
+
+    CustomType1 t = new CustomType1();
+    t.key = "key";
+    t.id = "id";
+    t.name = "name";
+
+    store.write(t);
+
+    t.name = "anotherName";
+
+    store.write(t);
+    assertEquals(1, store.count(t.getClass()));
+    assertSame(t, store.read(t.getClass(), t.key));
+  }
+
+  @Test
+  public void testArrayIndices() throws Exception {
+    KVStore store = new InMemoryStore();
+
+    ArrayKeyIndexType o = new ArrayKeyIndexType();
+    o.key = new int[] { 1, 2 };
+    o.id = new String[] { "3", "4" };
+
+    store.write(o);
+    assertEquals(o, store.read(ArrayKeyIndexType.class, o.key));
+    assertEquals(o, store.view(ArrayKeyIndexType.class).index("id").first(o.id).iterator().next());
+  }
+
+  @Test
+  public void testBasicIteration() throws Exception {
+    KVStore store = new InMemoryStore();
+
+    CustomType1 t1 = new CustomType1();
+    t1.key = "1";
+    t1.id = "id1";
+    t1.name = "name1";
+    store.write(t1);
+
+    CustomType1 t2 = new CustomType1();
+    t2.key = "2";
+    t2.id = "id2";
+    t2.name = "name2";
+    store.write(t2);
+
+    assertEquals(t1.id, store.view(t1.getClass()).iterator().next().id);
+    assertEquals(t2.id, store.view(t1.getClass()).skip(1).iterator().next().id);
+    assertEquals(t2.id, store.view(t1.getClass()).skip(1).max(1).iterator().next().id);
+    assertEquals(t1.id,
+      store.view(t1.getClass()).first(t1.key).max(1).iterator().next().id);
+    assertEquals(t2.id,
+      store.view(t1.getClass()).first(t2.key).max(1).iterator().next().id);
+    assertFalse(store.view(t1.getClass()).first(t2.id).skip(1).iterator().hasNext());
+  }
+
+}
diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java
index 42bff610457e7220f406d4e34aea1967b76fc993..86c85c1b7a03280b646b9a0b7043d3df68d04283 100644
--- a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java
@@ -283,28 +283,4 @@ public class LevelDBSuite {
 
   }
 
-  public static class ArrayKeyIndexType {
-
-    @KVIndex
-    public int[] key;
-
-    @KVIndex("id")
-    public String[] id;
-
-    @Override
-    public boolean equals(Object o) {
-      if (o instanceof ArrayKeyIndexType) {
-        ArrayKeyIndexType other = (ArrayKeyIndexType) o;
-        return Arrays.equals(key, other.key) && Arrays.equals(id, other.id);
-      }
-      return false;
-    }
-
-    @Override
-    public int hashCode() {
-      return key.hashCode();
-    }
-
-  }
-
 }
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index b9db1df2d1919763bebe7d361283d83454eafbaf..371a171aa98e3b9d5d16e34e4e17f37e8c24059b 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -230,7 +230,8 @@ object SparkBuild extends PomBuild {
 
     javacOptions in Compile ++= Seq(
       "-encoding", "UTF-8",
-      "-source", javacJVMVersion.value
+      "-source", javacJVMVersion.value,
+      "-Xlint:unchecked"
     ),
     // This -target option cannot be set in the Compile configuration scope since `javadoc` doesn't
     // play nicely with it; see https://github.com/sbt/sbt/issues/355#issuecomment-3817629 for