diff --git a/LICENSE b/LICENSE
index 383f079df8c8bf08a36a222552bc4ac063ba2d1f..65e1f480d9b14740ad8a5eed0e1329dcbef85934 100644
--- a/LICENSE
+++ b/LICENSE
@@ -442,7 +442,7 @@ Written by Pavel Binko, Dino Ferrero Merlino, Wolfgang Hoschek, Tony Johnson, An
 
 
 ========================================================================
-Fo SnapTree:
+For SnapTree:
 ========================================================================
 
 SNAPTREE LICENSE
@@ -482,6 +482,24 @@ OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 SUCH DAMAGE.
 
 
+========================================================================
+For Timsort (core/src/main/java/org/apache/spark/util/collection/Sorter.java):
+========================================================================
+Copyright (C) 2008 The Android Open Source Project
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+
 ========================================================================
 BSD-style licenses
 ========================================================================
diff --git a/core/src/main/java/org/apache/spark/util/collection/Sorter.java b/core/src/main/java/org/apache/spark/util/collection/Sorter.java
new file mode 100644
index 0000000000000000000000000000000000000000..64ad18c0e463aa63d65321fc40ee848c414ddd35
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/util/collection/Sorter.java
@@ -0,0 +1,915 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection;
+
+import java.util.Comparator;
+
+/**
+ * A port of the Android Timsort class, which utilizes a "stable, adaptive, iterative mergesort."
+ * See the method comment on sort() for more details.
+ *
+ * This has been kept in Java with the original style in order to match very closely with the
+ * Anroid source code, and thus be easy to verify correctness.
+ *
+ * The purpose of the port is to generalize the interface to the sort to accept input data formats
+ * besides simple arrays where every element is sorted individually. For instance, the AppendOnlyMap
+ * uses this to sort an Array with alternating elements of the form [key, value, key, value].
+ * This generalization comes with minimal overhead -- see SortDataFormat for more information.
+ */
+class Sorter<K, Buffer> {
+
+  /**
+   * This is the minimum sized sequence that will be merged.  Shorter
+   * sequences will be lengthened by calling binarySort.  If the entire
+   * array is less than this length, no merges will be performed.
+   *
+   * This constant should be a power of two.  It was 64 in Tim Peter's C
+   * implementation, but 32 was empirically determined to work better in
+   * this implementation.  In the unlikely event that you set this constant
+   * to be a number that's not a power of two, you'll need to change the
+   * minRunLength computation.
+   *
+   * If you decrease this constant, you must change the stackLen
+   * computation in the TimSort constructor, or you risk an
+   * ArrayOutOfBounds exception.  See listsort.txt for a discussion
+   * of the minimum stack length required as a function of the length
+   * of the array being sorted and the minimum merge sequence length.
+   */
+  private static final int MIN_MERGE = 32;
+
+  private final SortDataFormat<K, Buffer> s;
+
+  public Sorter(SortDataFormat<K, Buffer> sortDataFormat) {
+    this.s = sortDataFormat;
+  }
+
+  /**
+   * A stable, adaptive, iterative mergesort that requires far fewer than
+   * n lg(n) comparisons when running on partially sorted arrays, while
+   * offering performance comparable to a traditional mergesort when run
+   * on random arrays.  Like all proper mergesorts, this sort is stable and
+   * runs O(n log n) time (worst case).  In the worst case, this sort requires
+   * temporary storage space for n/2 object references; in the best case,
+   * it requires only a small constant amount of space.
+   *
+   * This implementation was adapted from Tim Peters's list sort for
+   * Python, which is described in detail here:
+   *
+   *   http://svn.python.org/projects/python/trunk/Objects/listsort.txt
+   *
+   * Tim's C code may be found here:
+   *
+   *   http://svn.python.org/projects/python/trunk/Objects/listobject.c
+   *
+   * The underlying techniques are described in this paper (and may have
+   * even earlier origins):
+   *
+   *  "Optimistic Sorting and Information Theoretic Complexity"
+   *  Peter McIlroy
+   *  SODA (Fourth Annual ACM-SIAM Symposium on Discrete Algorithms),
+   *  pp 467-474, Austin, Texas, 25-27 January 1993.
+   *
+   * While the API to this class consists solely of static methods, it is
+   * (privately) instantiable; a TimSort instance holds the state of an ongoing
+   * sort, assuming the input array is large enough to warrant the full-blown
+   * TimSort. Small arrays are sorted in place, using a binary insertion sort.
+   *
+   * @author Josh Bloch
+   */
+  void sort(Buffer a, int lo, int hi, Comparator<? super K> c) {
+    assert c != null;
+
+    int nRemaining  = hi - lo;
+    if (nRemaining < 2)
+      return;  // Arrays of size 0 and 1 are always sorted
+
+    // If array is small, do a "mini-TimSort" with no merges
+    if (nRemaining < MIN_MERGE) {
+      int initRunLen = countRunAndMakeAscending(a, lo, hi, c);
+      binarySort(a, lo, hi, lo + initRunLen, c);
+      return;
+    }
+
+    /**
+     * March over the array once, left to right, finding natural runs,
+     * extending short natural runs to minRun elements, and merging runs
+     * to maintain stack invariant.
+     */
+    SortState sortState = new SortState(a, c, hi - lo);
+    int minRun = minRunLength(nRemaining);
+    do {
+      // Identify next run
+      int runLen = countRunAndMakeAscending(a, lo, hi, c);
+
+      // If run is short, extend to min(minRun, nRemaining)
+      if (runLen < minRun) {
+        int force = nRemaining <= minRun ? nRemaining : minRun;
+        binarySort(a, lo, lo + force, lo + runLen, c);
+        runLen = force;
+      }
+
+      // Push run onto pending-run stack, and maybe merge
+      sortState.pushRun(lo, runLen);
+      sortState.mergeCollapse();
+
+      // Advance to find next run
+      lo += runLen;
+      nRemaining -= runLen;
+    } while (nRemaining != 0);
+
+    // Merge all remaining runs to complete sort
+    assert lo == hi;
+    sortState.mergeForceCollapse();
+    assert sortState.stackSize == 1;
+  }
+
+  /**
+   * Sorts the specified portion of the specified array using a binary
+   * insertion sort.  This is the best method for sorting small numbers
+   * of elements.  It requires O(n log n) compares, but O(n^2) data
+   * movement (worst case).
+   *
+   * If the initial part of the specified range is already sorted,
+   * this method can take advantage of it: the method assumes that the
+   * elements from index {@code lo}, inclusive, to {@code start},
+   * exclusive are already sorted.
+   *
+   * @param a the array in which a range is to be sorted
+   * @param lo the index of the first element in the range to be sorted
+   * @param hi the index after the last element in the range to be sorted
+   * @param start the index of the first element in the range that is
+   *        not already known to be sorted ({@code lo <= start <= hi})
+   * @param c comparator to used for the sort
+   */
+  @SuppressWarnings("fallthrough")
+  private void binarySort(Buffer a, int lo, int hi, int start, Comparator<? super K> c) {
+    assert lo <= start && start <= hi;
+    if (start == lo)
+      start++;
+
+    Buffer pivotStore = s.allocate(1);
+    for ( ; start < hi; start++) {
+      s.copyElement(a, start, pivotStore, 0);
+      K pivot = s.getKey(pivotStore, 0);
+
+      // Set left (and right) to the index where a[start] (pivot) belongs
+      int left = lo;
+      int right = start;
+      assert left <= right;
+      /*
+       * Invariants:
+       *   pivot >= all in [lo, left).
+       *   pivot <  all in [right, start).
+       */
+      while (left < right) {
+        int mid = (left + right) >>> 1;
+        if (c.compare(pivot, s.getKey(a, mid)) < 0)
+          right = mid;
+        else
+          left = mid + 1;
+      }
+      assert left == right;
+
+      /*
+       * The invariants still hold: pivot >= all in [lo, left) and
+       * pivot < all in [left, start), so pivot belongs at left.  Note
+       * that if there are elements equal to pivot, left points to the
+       * first slot after them -- that's why this sort is stable.
+       * Slide elements over to make room for pivot.
+       */
+      int n = start - left;  // The number of elements to move
+      // Switch is just an optimization for arraycopy in default case
+      switch (n) {
+        case 2:  s.copyElement(a, left + 1, a, left + 2);
+        case 1:  s.copyElement(a, left, a, left + 1);
+          break;
+        default: s.copyRange(a, left, a, left + 1, n);
+      }
+      s.copyElement(pivotStore, 0, a, left);
+    }
+  }
+
+  /**
+   * Returns the length of the run beginning at the specified position in
+   * the specified array and reverses the run if it is descending (ensuring
+   * that the run will always be ascending when the method returns).
+   *
+   * A run is the longest ascending sequence with:
+   *
+   *    a[lo] <= a[lo + 1] <= a[lo + 2] <= ...
+   *
+   * or the longest descending sequence with:
+   *
+   *    a[lo] >  a[lo + 1] >  a[lo + 2] >  ...
+   *
+   * For its intended use in a stable mergesort, the strictness of the
+   * definition of "descending" is needed so that the call can safely
+   * reverse a descending sequence without violating stability.
+   *
+   * @param a the array in which a run is to be counted and possibly reversed
+   * @param lo index of the first element in the run
+   * @param hi index after the last element that may be contained in the run.
+  It is required that {@code lo < hi}.
+   * @param c the comparator to used for the sort
+   * @return  the length of the run beginning at the specified position in
+   *          the specified array
+   */
+  private int countRunAndMakeAscending(Buffer a, int lo, int hi, Comparator<? super K> c) {
+    assert lo < hi;
+    int runHi = lo + 1;
+    if (runHi == hi)
+      return 1;
+
+    // Find end of run, and reverse range if descending
+    if (c.compare(s.getKey(a, runHi++), s.getKey(a, lo)) < 0) { // Descending
+      while (runHi < hi && c.compare(s.getKey(a, runHi), s.getKey(a, runHi - 1)) < 0)
+        runHi++;
+      reverseRange(a, lo, runHi);
+    } else {                              // Ascending
+      while (runHi < hi && c.compare(s.getKey(a, runHi), s.getKey(a, runHi - 1)) >= 0)
+        runHi++;
+    }
+
+    return runHi - lo;
+  }
+
+  /**
+   * Reverse the specified range of the specified array.
+   *
+   * @param a the array in which a range is to be reversed
+   * @param lo the index of the first element in the range to be reversed
+   * @param hi the index after the last element in the range to be reversed
+   */
+  private void reverseRange(Buffer a, int lo, int hi) {
+    hi--;
+    while (lo < hi) {
+      s.swap(a, lo, hi);
+      lo++;
+      hi--;
+    }
+  }
+
+  /**
+   * Returns the minimum acceptable run length for an array of the specified
+   * length. Natural runs shorter than this will be extended with
+   * {@link #binarySort}.
+   *
+   * Roughly speaking, the computation is:
+   *
+   *  If n < MIN_MERGE, return n (it's too small to bother with fancy stuff).
+   *  Else if n is an exact power of 2, return MIN_MERGE/2.
+   *  Else return an int k, MIN_MERGE/2 <= k <= MIN_MERGE, such that n/k
+   *   is close to, but strictly less than, an exact power of 2.
+   *
+   * For the rationale, see listsort.txt.
+   *
+   * @param n the length of the array to be sorted
+   * @return the length of the minimum run to be merged
+   */
+  private int minRunLength(int n) {
+    assert n >= 0;
+    int r = 0;      // Becomes 1 if any 1 bits are shifted off
+    while (n >= MIN_MERGE) {
+      r |= (n & 1);
+      n >>= 1;
+    }
+    return n + r;
+  }
+
+  private class SortState {
+
+    /**
+     * The Buffer being sorted.
+     */
+    private final Buffer a;
+
+    /**
+     * Length of the sort Buffer.
+     */
+    private final int aLength;
+
+    /**
+     * The comparator for this sort.
+     */
+    private final Comparator<? super K> c;
+
+    /**
+     * When we get into galloping mode, we stay there until both runs win less
+     * often than MIN_GALLOP consecutive times.
+     */
+    private static final int  MIN_GALLOP = 7;
+
+    /**
+     * This controls when we get *into* galloping mode.  It is initialized
+     * to MIN_GALLOP.  The mergeLo and mergeHi methods nudge it higher for
+     * random data, and lower for highly structured data.
+     */
+    private int minGallop = MIN_GALLOP;
+
+    /**
+     * Maximum initial size of tmp array, which is used for merging.  The array
+     * can grow to accommodate demand.
+     *
+     * Unlike Tim's original C version, we do not allocate this much storage
+     * when sorting smaller arrays.  This change was required for performance.
+     */
+    private static final int INITIAL_TMP_STORAGE_LENGTH = 256;
+
+    /**
+     * Temp storage for merges.
+     */
+    private Buffer tmp; // Actual runtime type will be Object[], regardless of T
+
+    /**
+     * Length of the temp storage.
+     */
+    private int tmpLength = 0;
+
+    /**
+     * A stack of pending runs yet to be merged.  Run i starts at
+     * address base[i] and extends for len[i] elements.  It's always
+     * true (so long as the indices are in bounds) that:
+     *
+     *     runBase[i] + runLen[i] == runBase[i + 1]
+     *
+     * so we could cut the storage for this, but it's a minor amount,
+     * and keeping all the info explicit simplifies the code.
+     */
+    private int stackSize = 0;  // Number of pending runs on stack
+    private final int[] runBase;
+    private final int[] runLen;
+
+    /**
+     * Creates a TimSort instance to maintain the state of an ongoing sort.
+     *
+     * @param a the array to be sorted
+     * @param c the comparator to determine the order of the sort
+     */
+    private SortState(Buffer a, Comparator<? super K> c, int len) {
+      this.aLength = len;
+      this.a = a;
+      this.c = c;
+
+      // Allocate temp storage (which may be increased later if necessary)
+      tmpLength = len < 2 * INITIAL_TMP_STORAGE_LENGTH ? len >>> 1 : INITIAL_TMP_STORAGE_LENGTH;
+      tmp = s.allocate(tmpLength);
+
+      /*
+       * Allocate runs-to-be-merged stack (which cannot be expanded).  The
+       * stack length requirements are described in listsort.txt.  The C
+       * version always uses the same stack length (85), but this was
+       * measured to be too expensive when sorting "mid-sized" arrays (e.g.,
+       * 100 elements) in Java.  Therefore, we use smaller (but sufficiently
+       * large) stack lengths for smaller arrays.  The "magic numbers" in the
+       * computation below must be changed if MIN_MERGE is decreased.  See
+       * the MIN_MERGE declaration above for more information.
+       */
+      int stackLen = (len <    120  ?  5 :
+                      len <   1542  ? 10 :
+                      len < 119151  ? 19 : 40);
+      runBase = new int[stackLen];
+      runLen = new int[stackLen];
+    }
+
+    /**
+     * Pushes the specified run onto the pending-run stack.
+     *
+     * @param runBase index of the first element in the run
+     * @param runLen  the number of elements in the run
+     */
+    private void pushRun(int runBase, int runLen) {
+      this.runBase[stackSize] = runBase;
+      this.runLen[stackSize] = runLen;
+      stackSize++;
+    }
+
+    /**
+     * Examines the stack of runs waiting to be merged and merges adjacent runs
+     * until the stack invariants are reestablished:
+     *
+     *     1. runLen[i - 3] > runLen[i - 2] + runLen[i - 1]
+     *     2. runLen[i - 2] > runLen[i - 1]
+     *
+     * This method is called each time a new run is pushed onto the stack,
+     * so the invariants are guaranteed to hold for i < stackSize upon
+     * entry to the method.
+     */
+    private void mergeCollapse() {
+      while (stackSize > 1) {
+        int n = stackSize - 2;
+        if (n > 0 && runLen[n-1] <= runLen[n] + runLen[n+1]) {
+          if (runLen[n - 1] < runLen[n + 1])
+            n--;
+          mergeAt(n);
+        } else if (runLen[n] <= runLen[n + 1]) {
+          mergeAt(n);
+        } else {
+          break; // Invariant is established
+        }
+      }
+    }
+
+    /**
+     * Merges all runs on the stack until only one remains.  This method is
+     * called once, to complete the sort.
+     */
+    private void mergeForceCollapse() {
+      while (stackSize > 1) {
+        int n = stackSize - 2;
+        if (n > 0 && runLen[n - 1] < runLen[n + 1])
+          n--;
+        mergeAt(n);
+      }
+    }
+
+    /**
+     * Merges the two runs at stack indices i and i+1.  Run i must be
+     * the penultimate or antepenultimate run on the stack.  In other words,
+     * i must be equal to stackSize-2 or stackSize-3.
+     *
+     * @param i stack index of the first of the two runs to merge
+     */
+    private void mergeAt(int i) {
+      assert stackSize >= 2;
+      assert i >= 0;
+      assert i == stackSize - 2 || i == stackSize - 3;
+
+      int base1 = runBase[i];
+      int len1 = runLen[i];
+      int base2 = runBase[i + 1];
+      int len2 = runLen[i + 1];
+      assert len1 > 0 && len2 > 0;
+      assert base1 + len1 == base2;
+
+      /*
+       * Record the length of the combined runs; if i is the 3rd-last
+       * run now, also slide over the last run (which isn't involved
+       * in this merge).  The current run (i+1) goes away in any case.
+       */
+      runLen[i] = len1 + len2;
+      if (i == stackSize - 3) {
+        runBase[i + 1] = runBase[i + 2];
+        runLen[i + 1] = runLen[i + 2];
+      }
+      stackSize--;
+
+      /*
+       * Find where the first element of run2 goes in run1. Prior elements
+       * in run1 can be ignored (because they're already in place).
+       */
+      int k = gallopRight(s.getKey(a, base2), a, base1, len1, 0, c);
+      assert k >= 0;
+      base1 += k;
+      len1 -= k;
+      if (len1 == 0)
+        return;
+
+      /*
+       * Find where the last element of run1 goes in run2. Subsequent elements
+       * in run2 can be ignored (because they're already in place).
+       */
+      len2 = gallopLeft(s.getKey(a, base1 + len1 - 1), a, base2, len2, len2 - 1, c);
+      assert len2 >= 0;
+      if (len2 == 0)
+        return;
+
+      // Merge remaining runs, using tmp array with min(len1, len2) elements
+      if (len1 <= len2)
+        mergeLo(base1, len1, base2, len2);
+      else
+        mergeHi(base1, len1, base2, len2);
+    }
+
+    /**
+     * Locates the position at which to insert the specified key into the
+     * specified sorted range; if the range contains an element equal to key,
+     * returns the index of the leftmost equal element.
+     *
+     * @param key the key whose insertion point to search for
+     * @param a the array in which to search
+     * @param base the index of the first element in the range
+     * @param len the length of the range; must be > 0
+     * @param hint the index at which to begin the search, 0 <= hint < n.
+     *     The closer hint is to the result, the faster this method will run.
+     * @param c the comparator used to order the range, and to search
+     * @return the int k,  0 <= k <= n such that a[b + k - 1] < key <= a[b + k],
+     *    pretending that a[b - 1] is minus infinity and a[b + n] is infinity.
+     *    In other words, key belongs at index b + k; or in other words,
+     *    the first k elements of a should precede key, and the last n - k
+     *    should follow it.
+     */
+    private int gallopLeft(K key, Buffer a, int base, int len, int hint, Comparator<? super K> c) {
+      assert len > 0 && hint >= 0 && hint < len;
+      int lastOfs = 0;
+      int ofs = 1;
+      if (c.compare(key, s.getKey(a, base + hint)) > 0) {
+        // Gallop right until a[base+hint+lastOfs] < key <= a[base+hint+ofs]
+        int maxOfs = len - hint;
+        while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint + ofs)) > 0) {
+          lastOfs = ofs;
+          ofs = (ofs << 1) + 1;
+          if (ofs <= 0)   // int overflow
+            ofs = maxOfs;
+        }
+        if (ofs > maxOfs)
+          ofs = maxOfs;
+
+        // Make offsets relative to base
+        lastOfs += hint;
+        ofs += hint;
+      } else { // key <= a[base + hint]
+        // Gallop left until a[base+hint-ofs] < key <= a[base+hint-lastOfs]
+        final int maxOfs = hint + 1;
+        while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint - ofs)) <= 0) {
+          lastOfs = ofs;
+          ofs = (ofs << 1) + 1;
+          if (ofs <= 0)   // int overflow
+            ofs = maxOfs;
+        }
+        if (ofs > maxOfs)
+          ofs = maxOfs;
+
+        // Make offsets relative to base
+        int tmp = lastOfs;
+        lastOfs = hint - ofs;
+        ofs = hint - tmp;
+      }
+      assert -1 <= lastOfs && lastOfs < ofs && ofs <= len;
+
+      /*
+       * Now a[base+lastOfs] < key <= a[base+ofs], so key belongs somewhere
+       * to the right of lastOfs but no farther right than ofs.  Do a binary
+       * search, with invariant a[base + lastOfs - 1] < key <= a[base + ofs].
+       */
+      lastOfs++;
+      while (lastOfs < ofs) {
+        int m = lastOfs + ((ofs - lastOfs) >>> 1);
+
+        if (c.compare(key, s.getKey(a, base + m)) > 0)
+          lastOfs = m + 1;  // a[base + m] < key
+        else
+          ofs = m;          // key <= a[base + m]
+      }
+      assert lastOfs == ofs;    // so a[base + ofs - 1] < key <= a[base + ofs]
+      return ofs;
+    }
+
+    /**
+     * Like gallopLeft, except that if the range contains an element equal to
+     * key, gallopRight returns the index after the rightmost equal element.
+     *
+     * @param key the key whose insertion point to search for
+     * @param a the array in which to search
+     * @param base the index of the first element in the range
+     * @param len the length of the range; must be > 0
+     * @param hint the index at which to begin the search, 0 <= hint < n.
+     *     The closer hint is to the result, the faster this method will run.
+     * @param c the comparator used to order the range, and to search
+     * @return the int k,  0 <= k <= n such that a[b + k - 1] <= key < a[b + k]
+     */
+    private int gallopRight(K key, Buffer a, int base, int len, int hint, Comparator<? super K> c) {
+      assert len > 0 && hint >= 0 && hint < len;
+
+      int ofs = 1;
+      int lastOfs = 0;
+      if (c.compare(key, s.getKey(a, base + hint)) < 0) {
+        // Gallop left until a[b+hint - ofs] <= key < a[b+hint - lastOfs]
+        int maxOfs = hint + 1;
+        while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint - ofs)) < 0) {
+          lastOfs = ofs;
+          ofs = (ofs << 1) + 1;
+          if (ofs <= 0)   // int overflow
+            ofs = maxOfs;
+        }
+        if (ofs > maxOfs)
+          ofs = maxOfs;
+
+        // Make offsets relative to b
+        int tmp = lastOfs;
+        lastOfs = hint - ofs;
+        ofs = hint - tmp;
+      } else { // a[b + hint] <= key
+        // Gallop right until a[b+hint + lastOfs] <= key < a[b+hint + ofs]
+        int maxOfs = len - hint;
+        while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint + ofs)) >= 0) {
+          lastOfs = ofs;
+          ofs = (ofs << 1) + 1;
+          if (ofs <= 0)   // int overflow
+            ofs = maxOfs;
+        }
+        if (ofs > maxOfs)
+          ofs = maxOfs;
+
+        // Make offsets relative to b
+        lastOfs += hint;
+        ofs += hint;
+      }
+      assert -1 <= lastOfs && lastOfs < ofs && ofs <= len;
+
+      /*
+       * Now a[b + lastOfs] <= key < a[b + ofs], so key belongs somewhere to
+       * the right of lastOfs but no farther right than ofs.  Do a binary
+       * search, with invariant a[b + lastOfs - 1] <= key < a[b + ofs].
+       */
+      lastOfs++;
+      while (lastOfs < ofs) {
+        int m = lastOfs + ((ofs - lastOfs) >>> 1);
+
+        if (c.compare(key, s.getKey(a, base + m)) < 0)
+          ofs = m;          // key < a[b + m]
+        else
+          lastOfs = m + 1;  // a[b + m] <= key
+      }
+      assert lastOfs == ofs;    // so a[b + ofs - 1] <= key < a[b + ofs]
+      return ofs;
+    }
+
+    /**
+     * Merges two adjacent runs in place, in a stable fashion.  The first
+     * element of the first run must be greater than the first element of the
+     * second run (a[base1] > a[base2]), and the last element of the first run
+     * (a[base1 + len1-1]) must be greater than all elements of the second run.
+     *
+     * For performance, this method should be called only when len1 <= len2;
+     * its twin, mergeHi should be called if len1 >= len2.  (Either method
+     * may be called if len1 == len2.)
+     *
+     * @param base1 index of first element in first run to be merged
+     * @param len1  length of first run to be merged (must be > 0)
+     * @param base2 index of first element in second run to be merged
+     *        (must be aBase + aLen)
+     * @param len2  length of second run to be merged (must be > 0)
+     */
+    private void mergeLo(int base1, int len1, int base2, int len2) {
+      assert len1 > 0 && len2 > 0 && base1 + len1 == base2;
+
+      // Copy first run into temp array
+      Buffer a = this.a; // For performance
+      Buffer tmp = ensureCapacity(len1);
+      s.copyRange(a, base1, tmp, 0, len1);
+
+      int cursor1 = 0;       // Indexes into tmp array
+      int cursor2 = base2;   // Indexes int a
+      int dest = base1;      // Indexes int a
+
+      // Move first element of second run and deal with degenerate cases
+      s.copyElement(a, cursor2++, a, dest++);
+      if (--len2 == 0) {
+        s.copyRange(tmp, cursor1, a, dest, len1);
+        return;
+      }
+      if (len1 == 1) {
+        s.copyRange(a, cursor2, a, dest, len2);
+        s.copyElement(tmp, cursor1, a, dest + len2); // Last elt of run 1 to end of merge
+        return;
+      }
+
+      Comparator<? super K> c = this.c;  // Use local variable for performance
+      int minGallop = this.minGallop;    //  "    "       "     "      "
+      outer:
+      while (true) {
+        int count1 = 0; // Number of times in a row that first run won
+        int count2 = 0; // Number of times in a row that second run won
+
+        /*
+         * Do the straightforward thing until (if ever) one run starts
+         * winning consistently.
+         */
+        do {
+          assert len1 > 1 && len2 > 0;
+          if (c.compare(s.getKey(a, cursor2), s.getKey(tmp, cursor1)) < 0) {
+            s.copyElement(a, cursor2++, a, dest++);
+            count2++;
+            count1 = 0;
+            if (--len2 == 0)
+              break outer;
+          } else {
+            s.copyElement(tmp, cursor1++, a, dest++);
+            count1++;
+            count2 = 0;
+            if (--len1 == 1)
+              break outer;
+          }
+        } while ((count1 | count2) < minGallop);
+
+        /*
+         * One run is winning so consistently that galloping may be a
+         * huge win. So try that, and continue galloping until (if ever)
+         * neither run appears to be winning consistently anymore.
+         */
+        do {
+          assert len1 > 1 && len2 > 0;
+          count1 = gallopRight(s.getKey(a, cursor2), tmp, cursor1, len1, 0, c);
+          if (count1 != 0) {
+            s.copyRange(tmp, cursor1, a, dest, count1);
+            dest += count1;
+            cursor1 += count1;
+            len1 -= count1;
+            if (len1 <= 1) // len1 == 1 || len1 == 0
+              break outer;
+          }
+          s.copyElement(a, cursor2++, a, dest++);
+          if (--len2 == 0)
+            break outer;
+
+          count2 = gallopLeft(s.getKey(tmp, cursor1), a, cursor2, len2, 0, c);
+          if (count2 != 0) {
+            s.copyRange(a, cursor2, a, dest, count2);
+            dest += count2;
+            cursor2 += count2;
+            len2 -= count2;
+            if (len2 == 0)
+              break outer;
+          }
+          s.copyElement(tmp, cursor1++, a, dest++);
+          if (--len1 == 1)
+            break outer;
+          minGallop--;
+        } while (count1 >= MIN_GALLOP | count2 >= MIN_GALLOP);
+        if (minGallop < 0)
+          minGallop = 0;
+        minGallop += 2;  // Penalize for leaving gallop mode
+      }  // End of "outer" loop
+      this.minGallop = minGallop < 1 ? 1 : minGallop;  // Write back to field
+
+      if (len1 == 1) {
+        assert len2 > 0;
+        s.copyRange(a, cursor2, a, dest, len2);
+        s.copyElement(tmp, cursor1, a, dest + len2); //  Last elt of run 1 to end of merge
+      } else if (len1 == 0) {
+        throw new IllegalArgumentException(
+            "Comparison method violates its general contract!");
+      } else {
+        assert len2 == 0;
+        assert len1 > 1;
+        s.copyRange(tmp, cursor1, a, dest, len1);
+      }
+    }
+
+    /**
+     * Like mergeLo, except that this method should be called only if
+     * len1 >= len2; mergeLo should be called if len1 <= len2.  (Either method
+     * may be called if len1 == len2.)
+     *
+     * @param base1 index of first element in first run to be merged
+     * @param len1  length of first run to be merged (must be > 0)
+     * @param base2 index of first element in second run to be merged
+     *        (must be aBase + aLen)
+     * @param len2  length of second run to be merged (must be > 0)
+     */
+    private void mergeHi(int base1, int len1, int base2, int len2) {
+      assert len1 > 0 && len2 > 0 && base1 + len1 == base2;
+
+      // Copy second run into temp array
+      Buffer a = this.a; // For performance
+      Buffer tmp = ensureCapacity(len2);
+      s.copyRange(a, base2, tmp, 0, len2);
+
+      int cursor1 = base1 + len1 - 1;  // Indexes into a
+      int cursor2 = len2 - 1;          // Indexes into tmp array
+      int dest = base2 + len2 - 1;     // Indexes into a
+
+      // Move last element of first run and deal with degenerate cases
+      s.copyElement(a, cursor1--, a, dest--);
+      if (--len1 == 0) {
+        s.copyRange(tmp, 0, a, dest - (len2 - 1), len2);
+        return;
+      }
+      if (len2 == 1) {
+        dest -= len1;
+        cursor1 -= len1;
+        s.copyRange(a, cursor1 + 1, a, dest + 1, len1);
+        s.copyElement(tmp, cursor2, a, dest);
+        return;
+      }
+
+      Comparator<? super K> c = this.c;  // Use local variable for performance
+      int minGallop = this.minGallop;    //  "    "       "     "      "
+      outer:
+      while (true) {
+        int count1 = 0; // Number of times in a row that first run won
+        int count2 = 0; // Number of times in a row that second run won
+
+        /*
+         * Do the straightforward thing until (if ever) one run
+         * appears to win consistently.
+         */
+        do {
+          assert len1 > 0 && len2 > 1;
+          if (c.compare(s.getKey(tmp, cursor2), s.getKey(a, cursor1)) < 0) {
+            s.copyElement(a, cursor1--, a, dest--);
+            count1++;
+            count2 = 0;
+            if (--len1 == 0)
+              break outer;
+          } else {
+            s.copyElement(tmp, cursor2--, a, dest--);
+            count2++;
+            count1 = 0;
+            if (--len2 == 1)
+              break outer;
+          }
+        } while ((count1 | count2) < minGallop);
+
+        /*
+         * One run is winning so consistently that galloping may be a
+         * huge win. So try that, and continue galloping until (if ever)
+         * neither run appears to be winning consistently anymore.
+         */
+        do {
+          assert len1 > 0 && len2 > 1;
+          count1 = len1 - gallopRight(s.getKey(tmp, cursor2), a, base1, len1, len1 - 1, c);
+          if (count1 != 0) {
+            dest -= count1;
+            cursor1 -= count1;
+            len1 -= count1;
+            s.copyRange(a, cursor1 + 1, a, dest + 1, count1);
+            if (len1 == 0)
+              break outer;
+          }
+          s.copyElement(tmp, cursor2--, a, dest--);
+          if (--len2 == 1)
+            break outer;
+
+          count2 = len2 - gallopLeft(s.getKey(a, cursor1), tmp, 0, len2, len2 - 1, c);
+          if (count2 != 0) {
+            dest -= count2;
+            cursor2 -= count2;
+            len2 -= count2;
+            s.copyRange(tmp, cursor2 + 1, a, dest + 1, count2);
+            if (len2 <= 1)  // len2 == 1 || len2 == 0
+              break outer;
+          }
+          s.copyElement(a, cursor1--, a, dest--);
+          if (--len1 == 0)
+            break outer;
+          minGallop--;
+        } while (count1 >= MIN_GALLOP | count2 >= MIN_GALLOP);
+        if (minGallop < 0)
+          minGallop = 0;
+        minGallop += 2;  // Penalize for leaving gallop mode
+      }  // End of "outer" loop
+      this.minGallop = minGallop < 1 ? 1 : minGallop;  // Write back to field
+
+      if (len2 == 1) {
+        assert len1 > 0;
+        dest -= len1;
+        cursor1 -= len1;
+        s.copyRange(a, cursor1 + 1, a, dest + 1, len1);
+        s.copyElement(tmp, cursor2, a, dest); // Move first elt of run2 to front of merge
+      } else if (len2 == 0) {
+        throw new IllegalArgumentException(
+            "Comparison method violates its general contract!");
+      } else {
+        assert len1 == 0;
+        assert len2 > 0;
+        s.copyRange(tmp, 0, a, dest - (len2 - 1), len2);
+      }
+    }
+
+    /**
+     * Ensures that the external array tmp has at least the specified
+     * number of elements, increasing its size if necessary.  The size
+     * increases exponentially to ensure amortized linear time complexity.
+     *
+     * @param minCapacity the minimum required capacity of the tmp array
+     * @return tmp, whether or not it grew
+     */
+    private Buffer ensureCapacity(int minCapacity) {
+      if (tmpLength < minCapacity) {
+        // Compute smallest power of 2 > minCapacity
+        int newSize = minCapacity;
+        newSize |= newSize >> 1;
+        newSize |= newSize >> 2;
+        newSize |= newSize >> 4;
+        newSize |= newSize >> 8;
+        newSize |= newSize >> 16;
+        newSize++;
+
+        if (newSize < 0) // Not bloody likely!
+          newSize = minCapacity;
+        else
+          newSize = Math.min(newSize, aLength >>> 1);
+
+        tmp = s.allocate(newSize);
+        tmpLength = newSize;
+      }
+      return tmp;
+    }
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
index 1a6f1c2b55799433570d3c82d5883a97d489596e..290282c9c2e2811f0c1d21d7cee878ff802b540c 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
@@ -254,26 +254,21 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)
    * Return an iterator of the map in sorted order. This provides a way to sort the map without
    * using additional memory, at the expense of destroying the validity of the map.
    */
-  def destructiveSortedIterator(cmp: Comparator[(K, V)]): Iterator[(K, V)] = {
+  def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = {
     destroyed = true
     // Pack KV pairs into the front of the underlying array
     var keyIndex, newIndex = 0
     while (keyIndex < capacity) {
       if (data(2 * keyIndex) != null) {
-        data(newIndex) = (data(2 * keyIndex), data(2 * keyIndex + 1))
+        data(2 * newIndex) = data(2 * keyIndex)
+        data(2 * newIndex + 1) = data(2 * keyIndex + 1)
         newIndex += 1
       }
       keyIndex += 1
     }
     assert(curSize == newIndex + (if (haveNullValue) 1 else 0))
 
-    // Sort by the given ordering
-    val rawOrdering = new Comparator[AnyRef] {
-      def compare(x: AnyRef, y: AnyRef): Int = {
-        cmp.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)])
-      }
-    }
-    Arrays.sort(data, 0, newIndex, rawOrdering)
+    new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator)
 
     new Iterator[(K, V)] {
       var i = 0
@@ -284,7 +279,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)
           nullValueReady = false
           (null.asInstanceOf[K], nullValue)
         } else {
-          val item = data(i).asInstanceOf[(K, V)]
+          val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V])
           i += 1
           item
         }
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 765254bf4c36e4e566a67cb5cb67a465bf082764..71ab2a3e3bef4a19a22de5249162f3dfa680a091 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -30,6 +30,7 @@ import org.apache.spark.{Logging, SparkEnv}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.storage.{BlockId, BlockManager}
+import org.apache.spark.util.collection.ExternalAppendOnlyMap.HashComparator
 
 /**
  * :: DeveloperApi ::
@@ -66,8 +67,6 @@ class ExternalAppendOnlyMap[K, V, C](
     blockManager: BlockManager = SparkEnv.get.blockManager)
   extends Iterable[(K, C)] with Serializable with Logging {
 
-  import ExternalAppendOnlyMap._
-
   private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
   private val spilledMaps = new ArrayBuffer[DiskMapIterator]
   private val sparkConf = SparkEnv.get.conf
@@ -105,7 +104,7 @@ class ExternalAppendOnlyMap[K, V, C](
   private var _diskBytesSpilled = 0L
 
   private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
-  private val comparator = new KCComparator[K, C]
+  private val keyComparator = new HashComparator[K]
   private val ser = serializer.newInstance()
 
   /**
@@ -173,7 +172,7 @@ class ExternalAppendOnlyMap[K, V, C](
     }
 
     try {
-      val it = currentMap.destructiveSortedIterator(comparator)
+      val it = currentMap.destructiveSortedIterator(keyComparator)
       while (it.hasNext) {
         val kv = it.next()
         writer.write(kv)
@@ -231,7 +230,7 @@ class ExternalAppendOnlyMap[K, V, C](
 
     // Input streams are derived both from the in-memory map and spilled maps on disk
     // The in-memory map is sorted in place, while the spilled maps are already in sorted order
-    private val sortedMap = currentMap.destructiveSortedIterator(comparator)
+    private val sortedMap = currentMap.destructiveSortedIterator(keyComparator)
     private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)
 
     inputStreams.foreach { it =>
@@ -252,7 +251,7 @@ class ExternalAppendOnlyMap[K, V, C](
       if (it.hasNext) {
         var kc = it.next()
         kcPairs += kc
-        val minHash = getKeyHashCode(kc)
+        val minHash = hashKey(kc)
         while (it.hasNext && it.head._1.hashCode() == minHash) {
           kc = it.next()
           kcPairs += kc
@@ -298,7 +297,7 @@ class ExternalAppendOnlyMap[K, V, C](
       val minPair = minPairs.remove(0)
       val minKey = minPair._1
       var minCombiner = minPair._2
-      assert(getKeyHashCode(minPair) == minHash)
+      assert(hashKey(minPair) == minHash)
 
       // For all other streams that may have this key (i.e. have the same minimum key hash),
       // merge in the corresponding value (if any) from that stream
@@ -339,7 +338,7 @@ class ExternalAppendOnlyMap[K, V, C](
       // Invalid if there are no more pairs in this stream
       def minKeyHash: Int = {
         assert(pairs.length > 0)
-        getKeyHashCode(pairs.head)
+        hashKey(pairs.head)
       }
 
       override def compareTo(other: StreamBuffer): Int = {
@@ -423,25 +422,27 @@ class ExternalAppendOnlyMap[K, V, C](
       file.delete()
     }
   }
+
+  /** Convenience function to hash the given (K, C) pair by the key. */
+  private def hashKey(kc: (K, C)): Int = ExternalAppendOnlyMap.hash(kc._1)
 }
 
 private[spark] object ExternalAppendOnlyMap {
 
   /**
-   * Return the key hash code of the given (key, combiner) pair.
-   * If the key is null, return a special hash code.
+   * Return the hash code of the given object. If the object is null, return a special hash code.
    */
-  private def getKeyHashCode[K, C](kc: (K, C)): Int = {
-    if (kc._1 == null) 0 else kc._1.hashCode()
+  private def hash[T](obj: T): Int = {
+    if (obj == null) 0 else obj.hashCode()
   }
 
   /**
-   * A comparator for (key, combiner) pairs based on their key hash codes.
+   * A comparator which sorts arbitrary keys based on their hash codes.
    */
-  private class KCComparator[K, C] extends Comparator[(K, C)] {
-    def compare(kc1: (K, C), kc2: (K, C)): Int = {
-      val hash1 = getKeyHashCode(kc1)
-      val hash2 = getKeyHashCode(kc2)
+  private class HashComparator[K] extends Comparator[K] {
+    def compare(key1: K, key2: K): Int = {
+      val hash1 = hash(key1)
+      val hash2 = hash(key2)
       if (hash1 < hash2) -1 else if (hash1 == hash2) 0 else 1
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala b/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala
new file mode 100644
index 0000000000000000000000000000000000000000..ac1528969f0be48df371760b93423d6690ba8444
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.reflect.ClassTag
+
+/**
+ * Abstraction for sorting an arbitrary input buffer of data. This interface requires determining
+ * the sort key for a given element index, as well as swapping elements and moving data from one
+ * buffer to another.
+ *
+ * Example format: an array of numbers, where each element is also the key.
+ * See [[KVArraySortDataFormat]] for a more exciting format.
+ *
+ * This trait extends Any to ensure it is universal (and thus compiled to a Java interface).
+ *
+ * @tparam K Type of the sort key of each element
+ * @tparam Buffer Internal data structure used by a particular format (e.g., Array[Int]).
+ */
+// TODO: Making Buffer a real trait would be a better abstraction, but adds some complexity.
+private[spark] trait SortDataFormat[K, Buffer] extends Any {
+  /** Return the sort key for the element at the given index. */
+  protected def getKey(data: Buffer, pos: Int): K
+
+  /** Swap two elements. */
+  protected def swap(data: Buffer, pos0: Int, pos1: Int): Unit
+
+  /** Copy a single element from src(srcPos) to dst(dstPos). */
+  protected def copyElement(src: Buffer, srcPos: Int, dst: Buffer, dstPos: Int): Unit
+
+  /**
+   * Copy a range of elements starting at src(srcPos) to dst, starting at dstPos.
+   * Overlapping ranges are allowed.
+   */
+  protected def copyRange(src: Buffer, srcPos: Int, dst: Buffer, dstPos: Int, length: Int): Unit
+
+  /**
+   * Allocates a Buffer that can hold up to 'length' elements.
+   * All elements of the buffer should be considered invalid until data is explicitly copied in.
+   */
+  protected def allocate(length: Int): Buffer
+}
+
+/**
+ * Supports sorting an array of key-value pairs where the elements of the array alternate between
+ * keys and values, as used in [[AppendOnlyMap]].
+ *
+ * @tparam K Type of the sort key of each element
+ * @tparam T Type of the Array we're sorting. Typically this must extend AnyRef, to support cases
+ *           when the keys and values are not the same type.
+ */
+private[spark]
+class KVArraySortDataFormat[K, T <: AnyRef : ClassTag] extends SortDataFormat[K, Array[T]] {
+
+  override protected def getKey(data: Array[T], pos: Int): K = data(2 * pos).asInstanceOf[K]
+
+  override protected def swap(data: Array[T], pos0: Int, pos1: Int) {
+    val tmpKey = data(2 * pos0)
+    val tmpVal = data(2 * pos0 + 1)
+    data(2 * pos0)     = data(2 * pos1)
+    data(2 * pos0 + 1) = data(2 * pos1 + 1)
+    data(2 * pos1)     = tmpKey
+    data(2 * pos1 + 1) = tmpVal
+  }
+
+  override protected def copyElement(src: Array[T], srcPos: Int, dst: Array[T], dstPos: Int) {
+    dst(2 * dstPos) = src(2 * srcPos)
+    dst(2 * dstPos + 1) = src(2 * srcPos + 1)
+  }
+
+  override protected def copyRange(src: Array[T], srcPos: Int,
+                                   dst: Array[T], dstPos: Int, length: Int) {
+    System.arraycopy(src, 2 * srcPos, dst, 2 * dstPos, 2 * length)
+  }
+
+  override protected def allocate(length: Int): Array[T] = {
+    new Array[T](2 * length)
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
index 52c7288e18b6948a2d2afc71203f2d49f7bdf6be..cb99d14b27af49d142e251bfd4437aa6c6c225b1 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
@@ -170,10 +170,10 @@ class AppendOnlyMapSuite extends FunSuite {
       case e: IllegalStateException => fail()
     }
 
-    val it = map.destructiveSortedIterator(new Comparator[(String, String)] {
-      def compare(kv1: (String, String), kv2: (String, String)): Int = {
-        val x = if (kv1 != null && kv1._1 != null) kv1._1.toInt else Int.MinValue
-        val y = if (kv2 != null && kv2._1 != null) kv2._1.toInt else Int.MinValue
+    val it = map.destructiveSortedIterator(new Comparator[String] {
+      def compare(key1: String, key2: String): Int = {
+        val x = if (key1 != null) key1.toInt else Int.MinValue
+        val y = if (key2 != null) key2.toInt else Int.MinValue
         x.compareTo(y)
       }
     })
diff --git a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..6fe1079c2719a45b0a39edd86f9bbf302ce9aa7d
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.lang.{Float => JFloat}
+import java.util.{Arrays, Comparator}
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.util.random.XORShiftRandom
+
+class SorterSuite extends FunSuite {
+
+  test("equivalent to Arrays.sort") {
+    val rand = new XORShiftRandom(123)
+    val data0 = Array.tabulate[Int](10000) { i => rand.nextInt() }
+    val data1 = data0.clone()
+
+    Arrays.sort(data0)
+    new Sorter(new IntArraySortDataFormat).sort(data1, 0, data1.length, Ordering.Int)
+
+    data0.zip(data1).foreach { case (x, y) => assert(x === y) }
+  }
+
+  test("KVArraySorter") {
+    val rand = new XORShiftRandom(456)
+
+    // Construct an array of keys (to Java sort) and an array where the keys and values
+    // alternate. Keys are random doubles, values are ordinals from 0 to length.
+    val keys = Array.tabulate[Double](5000) { i => rand.nextDouble() }
+    val keyValueArray = Array.tabulate[Number](10000) { i =>
+      if (i % 2 == 0) keys(i / 2) else new Integer(i / 2)
+    }
+
+    // Map from generated keys to values, to verify correctness later
+    val kvMap =
+      keyValueArray.grouped(2).map { case Array(k, v) => k.doubleValue() -> v.intValue() }.toMap
+
+    Arrays.sort(keys)
+    new Sorter(new KVArraySortDataFormat[Double, Number])
+      .sort(keyValueArray, 0, keys.length, Ordering.Double)
+
+    keys.zipWithIndex.foreach { case (k, i) =>
+      assert(k === keyValueArray(2 * i))
+      assert(kvMap(k) === keyValueArray(2 * i + 1))
+    }
+  }
+
+  /**
+   * This provides a simple benchmark for comparing the Sorter with Java internal sorting.
+   * Ideally these would be executed one at a time, each in their own JVM, so their listing
+   * here is mainly to have the code.
+   *
+   * The goal of this code is to sort an array of key-value pairs, where the array physically
+   * has the keys and values alternating. The basic Java sorts work only on the keys, so the
+   * real Java solution is to make Tuple2s to store the keys and values and sort an array of
+   * those, while the Sorter approach can work directly on the input data format.
+   *
+   * Note that the Java implementation varies tremendously between Java 6 and Java 7, when
+   * the Java sort changed from merge sort to Timsort.
+   */
+  ignore("Sorter benchmark") {
+
+    /** Runs an experiment several times. */
+    def runExperiment(name: String)(f: => Unit): Unit = {
+      val firstTry = org.apache.spark.util.Utils.timeIt(1)(f)
+      System.gc()
+
+      var i = 0
+      var next10: Long = 0
+      while (i < 10) {
+        val time = org.apache.spark.util.Utils.timeIt(1)(f)
+        next10 += time
+        println(s"$name: Took $time ms")
+        i += 1
+      }
+
+      println(s"$name: ($firstTry ms first try, ${next10 / 10} ms average)")
+    }
+
+    val numElements = 25000000 // 25 mil
+    val rand = new XORShiftRandom(123)
+
+    val keys = Array.tabulate[JFloat](numElements) { i =>
+      new JFloat(rand.nextFloat())
+    }
+
+    // Test our key-value pairs where each element is a Tuple2[Float, Integer)
+    val kvTupleArray = Array.tabulate[AnyRef](numElements) { i =>
+      (keys(i / 2): Float, i / 2: Int)
+    }
+    runExperiment("Tuple-sort using Arrays.sort()") {
+      Arrays.sort(kvTupleArray, new Comparator[AnyRef] {
+        override def compare(x: AnyRef, y: AnyRef): Int =
+          Ordering.Float.compare(x.asInstanceOf[(Float, _)]._1, y.asInstanceOf[(Float, _)]._1)
+      })
+    }
+
+    // Test our Sorter where each element alternates between Float and Integer, non-primitive
+    val keyValueArray = Array.tabulate[AnyRef](numElements * 2) { i =>
+      if (i % 2 == 0) keys(i / 2) else new Integer(i / 2)
+    }
+    val sorter = new Sorter(new KVArraySortDataFormat[JFloat, AnyRef])
+    runExperiment("KV-sort using Sorter") {
+      sorter.sort(keyValueArray, 0, keys.length, new Comparator[JFloat] {
+        override def compare(x: JFloat, y: JFloat): Int = Ordering.Float.compare(x, y)
+      })
+    }
+
+    // Test non-primitive sort on float array
+    runExperiment("Java Arrays.sort()") {
+      Arrays.sort(keys, new Comparator[JFloat] {
+        override def compare(x: JFloat, y: JFloat): Int = Ordering.Float.compare(x, y)
+      })
+    }
+
+    // Test primitive sort on float array
+    val primitiveKeys = Array.tabulate[Float](numElements) { i => rand.nextFloat() }
+    runExperiment("Java Arrays.sort() on primitive keys") {
+      Arrays.sort(primitiveKeys)
+    }
+  }
+}
+
+
+/** Format to sort a simple Array[Int]. Could be easily generified and specialized. */
+class IntArraySortDataFormat extends SortDataFormat[Int, Array[Int]] {
+  override protected def getKey(data: Array[Int], pos: Int): Int = {
+    data(pos)
+  }
+
+  override protected def swap(data: Array[Int], pos0: Int, pos1: Int): Unit = {
+    val tmp = data(pos0)
+    data(pos0) = data(pos1)
+    data(pos1) = tmp
+  }
+
+  override protected def copyElement(src: Array[Int], srcPos: Int, dst: Array[Int], dstPos: Int) {
+    dst(dstPos) = src(srcPos)
+  }
+
+  /** Copy a range of elements starting at src(srcPos) to dest, starting at destPos. */
+  override protected def copyRange(src: Array[Int], srcPos: Int,
+                                   dst: Array[Int], dstPos: Int, length: Int) {
+    System.arraycopy(src, srcPos, dst, dstPos, length)
+  }
+
+  /** Allocates a new structure that can hold up to 'length' elements. */
+  override protected def allocate(length: Int): Array[Int] = {
+    new Array[Int](length)
+  }
+}