diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 289aab9bd9e516f910f0db1d97049f570439bf07..7196e57d5d2e242665072a8cd14afbd49ca83735 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -18,7 +18,7 @@
 package org.apache.spark
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}
+import org.apache.spark.util.collection.ExternalAppendOnlyMap
 
 /**
  * :: DeveloperApi ::
@@ -34,59 +34,30 @@ case class Aggregator[K, V, C] (
     mergeValue: (C, V) => C,
     mergeCombiners: (C, C) => C) {
 
-  // When spilling is enabled sorting will happen externally, but not necessarily with an
-  // ExternalSorter.
-  private val isSpillEnabled = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
-
   @deprecated("use combineValuesByKey with TaskContext argument", "0.9.0")
   def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] =
     combineValuesByKey(iter, null)
 
-  def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
-                         context: TaskContext): Iterator[(K, C)] = {
-    if (!isSpillEnabled) {
-      val combiners = new AppendOnlyMap[K, C]
-      var kv: Product2[K, V] = null
-      val update = (hadValue: Boolean, oldValue: C) => {
-        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
-      }
-      while (iter.hasNext) {
-        kv = iter.next()
-        combiners.changeValue(kv._1, update)
-      }
-      combiners.iterator
-    } else {
-      val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
-      combiners.insertAll(iter)
-      updateMetrics(context, combiners)
-      combiners.iterator
-    }
+  def combineValuesByKey(
+      iter: Iterator[_ <: Product2[K, V]],
+      context: TaskContext): Iterator[(K, C)] = {
+    val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
+    combiners.insertAll(iter)
+    updateMetrics(context, combiners)
+    combiners.iterator
   }
 
   @deprecated("use combineCombinersByKey with TaskContext argument", "0.9.0")
   def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]]) : Iterator[(K, C)] =
     combineCombinersByKey(iter, null)
 
-  def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]], context: TaskContext)
-    : Iterator[(K, C)] =
-  {
-    if (!isSpillEnabled) {
-      val combiners = new AppendOnlyMap[K, C]
-      var kc: Product2[K, C] = null
-      val update = (hadValue: Boolean, oldValue: C) => {
-        if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2
-      }
-      while (iter.hasNext) {
-        kc = iter.next()
-        combiners.changeValue(kc._1, update)
-      }
-      combiners.iterator
-    } else {
-      val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
-      combiners.insertAll(iter)
-      updateMetrics(context, combiners)
-      combiners.iterator
-    }
+  def combineCombinersByKey(
+      iter: Iterator[_ <: Product2[K, C]],
+      context: TaskContext): Iterator[(K, C)] = {
+    val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
+    combiners.insertAll(iter)
+    updateMetrics(context, combiners)
+    combiners.iterator
   }
 
   /** Update task metrics after populating the external map. */
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 7bad749d583274af12d5adf67bb91b930e6e7460..935c3babd8ea144a2ee9f434b319a3d76ea08803 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer}
+import org.apache.spark.util.collection.{CompactBuffer, ExternalAppendOnlyMap}
 import org.apache.spark.util.Utils
 import org.apache.spark.serializer.Serializer
 
@@ -128,8 +128,6 @@ class CoGroupedRDD[K: ClassTag](
   override val partitioner: Some[Partitioner] = Some(part)
 
   override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
-    val sparkConf = SparkEnv.get.conf
-    val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
     val split = s.asInstanceOf[CoGroupPartition]
     val numRdds = dependencies.length
 
@@ -150,34 +148,16 @@ class CoGroupedRDD[K: ClassTag](
         rddIterators += ((it, depNum))
     }
 
-    if (!externalSorting) {
-      val map = new AppendOnlyMap[K, CoGroupCombiner]
-      val update: (Boolean, CoGroupCombiner) => CoGroupCombiner = (hadVal, oldVal) => {
-        if (hadVal) oldVal else Array.fill(numRdds)(new CoGroup)
-      }
-      val getCombiner: K => CoGroupCombiner = key => {
-        map.changeValue(key, update)
-      }
-      rddIterators.foreach { case (it, depNum) =>
-        while (it.hasNext) {
-          val kv = it.next()
-          getCombiner(kv._1)(depNum) += kv._2
-        }
-      }
-      new InterruptibleIterator(context,
-        map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
-    } else {
-      val map = createExternalMap(numRdds)
-      for ((it, depNum) <- rddIterators) {
-        map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
-      }
-      context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
-      context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
-      context.internalMetricsToAccumulators(
-        InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes)
-      new InterruptibleIterator(context,
-        map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
+    val map = createExternalMap(numRdds)
+    for ((it, depNum) <- rddIterators) {
+      map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
     }
+    context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
+    context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
+    context.internalMetricsToAccumulators(
+      InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes)
+    new InterruptibleIterator(context,
+      map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
   }
 
   private def createExternalMap(numRdds: Int)
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
index c089088f409dd5d73a2d30a6ecd5841a6f38d876..0b46634b8b4669eb1d0b1ea8b27b5c7be2a24b0f 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
@@ -24,7 +24,13 @@ import org.apache.spark.shuffle._
  * A ShuffleManager using hashing, that creates one output file per reduce partition on each
  * mapper (possibly reusing these across waves of tasks).
  */
-private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager {
+private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
+
+  if (!conf.getBoolean("spark.shuffle.spill", true)) {
+    logWarning(
+      "spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+." +
+        " Shuffle will continue to spill to disk when necessary.")
+  }
 
   private val fileShuffleBlockResolver = new FileShuffleBlockResolver(conf)
 
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index d7fab351ca3b88d29deaca22873739a7242f5a89..476cc1f303da796a01814bfca286a5c8b2b73464 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -19,11 +19,17 @@ package org.apache.spark.shuffle.sort
 
 import java.util.concurrent.ConcurrentHashMap
 
-import org.apache.spark.{SparkConf, TaskContext, ShuffleDependency}
+import org.apache.spark.{Logging, SparkConf, TaskContext, ShuffleDependency}
 import org.apache.spark.shuffle._
 import org.apache.spark.shuffle.hash.HashShuffleReader
 
-private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager {
+private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
+
+  if (!conf.getBoolean("spark.shuffle.spill", true)) {
+    logWarning(
+      "spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+." +
+        " Shuffle will continue to spill to disk when necessary.")
+  }
 
   private val indexShuffleBlockResolver = new IndexShuffleBlockResolver(conf)
   private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]()
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 31230d5978b2aeb50f196e40bf4210211cc8406b..2a30f751ff03d49c8444c8e26304b027cb1bf2b2 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -116,8 +116,6 @@ private[spark] class ExternalSorter[K, V, C](
   private val ser = Serializer.getSerializer(serializer)
   private val serInstance = ser.newInstance()
 
-  private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true)
-
   // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
   private val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
 
@@ -229,10 +227,6 @@ private[spark] class ExternalSorter[K, V, C](
    * @param usingMap whether we're using a map or buffer as our current in-memory collection
    */
   private def maybeSpillCollection(usingMap: Boolean): Unit = {
-    if (!spillingEnabled) {
-      return
-    }
-
     var estimatedSize = 0L
     if (usingMap) {
       estimatedSize = map.estimateSize()
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 1110ca6051a40b236c03369bb21f369bad8d81cc..1fd470cd3b01d60407c9381b88a6c3ad61975879 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -147,7 +147,7 @@ class SparkSubmitSuite
       "--archives", "archive1.txt,archive2.txt",
       "--num-executors", "6",
       "--name", "beauty",
-      "--conf", "spark.shuffle.spill=false",
+      "--conf", "spark.ui.enabled=false",
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
@@ -166,7 +166,7 @@ class SparkSubmitSuite
     mainClass should be ("org.apache.spark.deploy.yarn.Client")
     classpath should have length (0)
     sysProps("spark.app.name") should be ("beauty")
-    sysProps("spark.shuffle.spill") should be ("false")
+    sysProps("spark.ui.enabled") should be ("false")
     sysProps("SPARK_SUBMIT") should be ("true")
     sysProps.keys should not contain ("spark.jars")
   }
@@ -185,7 +185,7 @@ class SparkSubmitSuite
       "--archives", "archive1.txt,archive2.txt",
       "--num-executors", "6",
       "--name", "trill",
-      "--conf", "spark.shuffle.spill=false",
+      "--conf", "spark.ui.enabled=false",
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
@@ -206,7 +206,7 @@ class SparkSubmitSuite
     sysProps("spark.yarn.dist.archives") should include regex (".*archive1.txt,.*archive2.txt")
     sysProps("spark.jars") should include regex (".*one.jar,.*two.jar,.*three.jar,.*thejar.jar")
     sysProps("SPARK_SUBMIT") should be ("true")
-    sysProps("spark.shuffle.spill") should be ("false")
+    sysProps("spark.ui.enabled") should be ("false")
   }
 
   test("handles standalone cluster mode") {
@@ -229,7 +229,7 @@ class SparkSubmitSuite
       "--supervise",
       "--driver-memory", "4g",
       "--driver-cores", "5",
-      "--conf", "spark.shuffle.spill=false",
+      "--conf", "spark.ui.enabled=false",
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
@@ -253,9 +253,9 @@ class SparkSubmitSuite
     sysProps.keys should contain ("spark.driver.memory")
     sysProps.keys should contain ("spark.driver.cores")
     sysProps.keys should contain ("spark.driver.supervise")
-    sysProps.keys should contain ("spark.shuffle.spill")
+    sysProps.keys should contain ("spark.ui.enabled")
     sysProps.keys should contain ("spark.submit.deployMode")
-    sysProps("spark.shuffle.spill") should be ("false")
+    sysProps("spark.ui.enabled") should be ("false")
   }
 
   test("handles standalone client mode") {
@@ -266,7 +266,7 @@ class SparkSubmitSuite
       "--total-executor-cores", "5",
       "--class", "org.SomeClass",
       "--driver-memory", "4g",
-      "--conf", "spark.shuffle.spill=false",
+      "--conf", "spark.ui.enabled=false",
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
@@ -277,7 +277,7 @@ class SparkSubmitSuite
     classpath(0) should endWith ("thejar.jar")
     sysProps("spark.executor.memory") should be ("5g")
     sysProps("spark.cores.max") should be ("5")
-    sysProps("spark.shuffle.spill") should be ("false")
+    sysProps("spark.ui.enabled") should be ("false")
   }
 
   test("handles mesos client mode") {
@@ -288,7 +288,7 @@ class SparkSubmitSuite
       "--total-executor-cores", "5",
       "--class", "org.SomeClass",
       "--driver-memory", "4g",
-      "--conf", "spark.shuffle.spill=false",
+      "--conf", "spark.ui.enabled=false",
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
@@ -299,7 +299,7 @@ class SparkSubmitSuite
     classpath(0) should endWith ("thejar.jar")
     sysProps("spark.executor.memory") should be ("5g")
     sysProps("spark.cores.max") should be ("5")
-    sysProps("spark.shuffle.spill") should be ("false")
+    sysProps("spark.ui.enabled") should be ("false")
   }
 
   test("handles confs with flag equivalents") {
diff --git a/docs/configuration.md b/docs/configuration.md
index 3700051efb4488b705245b8cdce8722c0cb368e1..5ec097c78aa384b7312212e722a63afb1a3adcae 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -69,7 +69,7 @@ val sc = new SparkContext(new SparkConf())
 
 Then, you can supply configuration values at runtime:
 {% highlight bash %}
-./bin/spark-submit --name "My app" --master local[4] --conf spark.shuffle.spill=false
+./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false
   --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar
 {% endhighlight %}
 
@@ -449,8 +449,8 @@ Apart from these, the following properties are also available, and may be useful
   <td><code>spark.shuffle.memoryFraction</code></td>
   <td>0.2</td>
   <td>
-    Fraction of Java heap to use for aggregation and cogroups during shuffles, if
-    <code>spark.shuffle.spill</code> is true. At any given time, the collective size of
+    Fraction of Java heap to use for aggregation and cogroups during shuffles.
+    At any given time, the collective size of
     all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will
     begin to spill to disk. If spills are often, consider increasing this value at the expense of
     <code>spark.storage.memoryFraction</code>.
@@ -483,14 +483,6 @@ Apart from these, the following properties are also available, and may be useful
     map-side aggregation and there are at most this many reduce partitions.
   </td>
 </tr>
-<tr>
-  <td><code>spark.shuffle.spill</code></td>
-  <td>true</td>
-  <td>
-    If set to "true", limits the amount of memory used during reduces by spilling data out to disk.
-    This spilling threshold is specified by <code>spark.shuffle.memoryFraction</code>.
-  </td>
-</tr>
 <tr>
   <td><code>spark.shuffle.spill.compress</code></td>
   <td>true</td>
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 82d4243cc6b2725e17fe510822ba969e6d146c4e..7ae9244c271e30da41e3843972836c11b70691a6 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1936,13 +1936,6 @@ that these options will be deprecated in future release as more optimizations ar
       Configures the number of partitions to use when shuffling data for joins or aggregations.
     </td>
   </tr>
-  <tr>
-    <td><code>spark.sql.planner.externalSort</code></td>
-    <td>true</td>
-    <td>
-      When true, performs sorts spilling to disk as needed otherwise sort each partition in memory.
-    </td>
-  </tr>
 </table>
 
 # Distributed SQL Engine
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index ab5aab1e115f7f8dcf606e9a25800be7aac42bfd..73d7d9a5692a9f6d1db0fe5a5f658c6b4569b5dd 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -48,7 +48,7 @@ from pyspark.statcounter import StatCounter
 from pyspark.rddsampler import RDDSampler, RDDRangeSampler, RDDStratifiedSampler
 from pyspark.storagelevel import StorageLevel
 from pyspark.resultiterable import ResultIterable
-from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, \
+from pyspark.shuffle import Aggregator, ExternalMerger, \
     get_used_memory, ExternalSorter, ExternalGroupBy
 from pyspark.traceback_utils import SCCallSiteSync
 
@@ -580,12 +580,11 @@ class RDD(object):
         if numPartitions is None:
             numPartitions = self._defaultReducePartitions()
 
-        spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower() == "true")
         memory = _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))
         serializer = self._jrdd_deserializer
 
         def sortPartition(iterator):
-            sort = ExternalSorter(memory * 0.9, serializer).sorted if spill else sorted
+            sort = ExternalSorter(memory * 0.9, serializer).sorted
             return iter(sort(iterator, key=lambda k_v: keyfunc(k_v[0]), reverse=(not ascending)))
 
         return self.partitionBy(numPartitions, partitionFunc).mapPartitions(sortPartition, True)
@@ -610,12 +609,11 @@ class RDD(object):
         if numPartitions is None:
             numPartitions = self._defaultReducePartitions()
 
-        spill = self._can_spill()
         memory = self._memory_limit()
         serializer = self._jrdd_deserializer
 
         def sortPartition(iterator):
-            sort = ExternalSorter(memory * 0.9, serializer).sorted if spill else sorted
+            sort = ExternalSorter(memory * 0.9, serializer).sorted
             return iter(sort(iterator, key=lambda kv: keyfunc(kv[0]), reverse=(not ascending)))
 
         if numPartitions == 1:
@@ -1770,13 +1768,11 @@ class RDD(object):
             numPartitions = self._defaultReducePartitions()
 
         serializer = self.ctx.serializer
-        spill = self._can_spill()
         memory = self._memory_limit()
         agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
 
         def combineLocally(iterator):
-            merger = ExternalMerger(agg, memory * 0.9, serializer) \
-                if spill else InMemoryMerger(agg)
+            merger = ExternalMerger(agg, memory * 0.9, serializer)
             merger.mergeValues(iterator)
             return merger.items()
 
@@ -1784,8 +1780,7 @@ class RDD(object):
         shuffled = locally_combined.partitionBy(numPartitions)
 
         def _mergeCombiners(iterator):
-            merger = ExternalMerger(agg, memory, serializer) \
-                if spill else InMemoryMerger(agg)
+            merger = ExternalMerger(agg, memory, serializer)
             merger.mergeCombiners(iterator)
             return merger.items()
 
@@ -1824,9 +1819,6 @@ class RDD(object):
 
         return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions)
 
-    def _can_spill(self):
-        return self.ctx._conf.get("spark.shuffle.spill", "True").lower() == "true"
-
     def _memory_limit(self):
         return _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))
 
@@ -1857,14 +1849,12 @@ class RDD(object):
             a.extend(b)
             return a
 
-        spill = self._can_spill()
         memory = self._memory_limit()
         serializer = self._jrdd_deserializer
         agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
 
         def combine(iterator):
-            merger = ExternalMerger(agg, memory * 0.9, serializer) \
-                if spill else InMemoryMerger(agg)
+            merger = ExternalMerger(agg, memory * 0.9, serializer)
             merger.mergeValues(iterator)
             return merger.items()
 
@@ -1872,8 +1862,7 @@ class RDD(object):
         shuffled = locally_combined.partitionBy(numPartitions)
 
         def groupByKey(it):
-            merger = ExternalGroupBy(agg, memory, serializer)\
-                if spill else InMemoryMerger(agg)
+            merger = ExternalGroupBy(agg, memory, serializer)
             merger.mergeCombiners(it)
             return merger.items()
 
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index b8118bdb7ca76b3b853e6e2e91b0a3a6301c1e3b..e974cda9fc3e167d5babfeb85ff4840abdb57e8b 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -131,36 +131,6 @@ class Merger(object):
         raise NotImplementedError
 
 
-class InMemoryMerger(Merger):
-
-    """
-    In memory merger based on in-memory dict.
-    """
-
-    def __init__(self, aggregator):
-        Merger.__init__(self, aggregator)
-        self.data = {}
-
-    def mergeValues(self, iterator):
-        """ Combine the items by creator and combiner """
-        # speed up attributes lookup
-        d, creator = self.data, self.agg.createCombiner
-        comb = self.agg.mergeValue
-        for k, v in iterator:
-            d[k] = comb(d[k], v) if k in d else creator(v)
-
-    def mergeCombiners(self, iterator):
-        """ Merge the combined items by mergeCombiner """
-        # speed up attributes lookup
-        d, comb = self.data, self.agg.mergeCombiners
-        for k, v in iterator:
-            d[k] = comb(d[k], v) if k in d else v
-
-    def items(self):
-        """ Return the merged items ad iterator """
-        return iter(self.data.items())
-
-
 def _compressed_serializer(self, serializer=None):
     # always use PickleSerializer to simplify implementation
     ser = PickleSerializer()
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 647504c32f156af5691ba6dc40d266c8b37eec1e..f11aaf001c8df3629d1f2920805fc0272ad6d600 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -62,7 +62,7 @@ from pyspark.serializers import read_int, BatchedSerializer, MarshalSerializer,
     CloudPickleSerializer, CompressedSerializer, UTF8Deserializer, NoOpSerializer, \
     PairDeserializer, CartesianDeserializer, AutoBatchedSerializer, AutoSerializer, \
     FlattenedValuesSerializer
-from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, ExternalSorter
+from pyspark.shuffle import Aggregator, ExternalMerger, ExternalSorter
 from pyspark import shuffle
 from pyspark.profiler import BasicProfiler
 
@@ -95,17 +95,6 @@ class MergerTests(unittest.TestCase):
                               lambda x, y: x.append(y) or x,
                               lambda x, y: x.extend(y) or x)
 
-    def test_in_memory(self):
-        m = InMemoryMerger(self.agg)
-        m.mergeValues(self.data)
-        self.assertEqual(sum(sum(v) for k, v in m.items()),
-                         sum(xrange(self.N)))
-
-        m = InMemoryMerger(self.agg)
-        m.mergeCombiners(map(lambda x_y: (x_y[0], [x_y[1]]), self.data))
-        self.assertEqual(sum(sum(v) for k, v in m.items()),
-                         sum(xrange(self.N)))
-
     def test_small_dataset(self):
         m = ExternalMerger(self.agg, 1000)
         m.mergeValues(self.data)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 9de75f4c4d08414bd9641028a4525ed050dafaa1..b9fb90d964206306ee4ce05506030e6ff7ff0ae1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -330,11 +330,6 @@ private[spark] object SQLConf {
 
   // Options that control which operators can be chosen by the query planner.  These should be
   // considered hints and may be ignored by future versions of Spark SQL.
-  val EXTERNAL_SORT = booleanConf("spark.sql.planner.externalSort",
-    defaultValue = Some(true),
-    doc = "When true, performs sorts spilling to disk as needed otherwise sort each partition in" +
-      " memory.")
-
   val SORTMERGE_JOIN = booleanConf("spark.sql.planner.sortMergeJoin",
     defaultValue = Some(true),
     doc = "When true, use sort merge join (as opposed to hash join) by default for large joins.")
@@ -422,6 +417,7 @@ private[spark] object SQLConf {
 
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
+    val EXTERNAL_SORT = "spark.sql.planner.externalSort"
   }
 }
 
@@ -476,8 +472,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
 
   private[spark] def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
 
-  private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT)
-
   private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN)
 
   private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, getConf(TUNGSTEN_ENABLED))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 5e40d77689045b2d17c1afee69b2a6fa87545f33..41b215c79296adc8e2229b71abdaf0da3a6b26ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -312,8 +312,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       if (sqlContext.conf.unsafeEnabled && sqlContext.conf.codegenEnabled &&
         TungstenSort.supportsSchema(child.schema)) {
         execution.TungstenSort(sortExprs, global, child)
-      } else if (sqlContext.conf.externalSortEnabled) {
-        execution.ExternalSort(sortExprs, global, child)
       } else {
         execution.Sort(sortExprs, global, child)
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 95209e6634519df6178f2f177129652550257dd9..af28e2dfa4186e3c2508cb08cba5d03ac8273487 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -105,6 +105,15 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
       }
       (keyValueOutput, runFunc)
 
+    case Some((SQLConf.Deprecated.EXTERNAL_SORT, Some(value))) =>
+      val runFunc = (sqlContext: SQLContext) => {
+        logWarning(
+          s"Property ${SQLConf.Deprecated.EXTERNAL_SORT} is deprecated and will be ignored. " +
+            s"External sort will continue to be used.")
+        Seq(Row(SQLConf.Deprecated.EXTERNAL_SORT, "true"))
+      }
+      (keyValueOutput, runFunc)
+
     // Configures a single property.
     case Some((key, Some(value))) =>
       val runFunc = (sqlContext: SQLContext) => {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
index 40ef7c3b5353004edcae6e3e044f1531171e0f2f..27f26245a5ef0aa699a6be28d9bb1cc5ada4a226 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
@@ -31,38 +31,12 @@ import org.apache.spark.{SparkEnv, InternalAccumulator, TaskContext}
 // This file defines various sort operators.
 ////////////////////////////////////////////////////////////////////////////////////////////////////
 
-
-/**
- * Performs a sort on-heap.
- * @param global when true performs a global sort of all partitions by shuffling the data first
- *               if necessary.
- */
-case class Sort(
-    sortOrder: Seq[SortOrder],
-    global: Boolean,
-    child: SparkPlan)
-  extends UnaryNode {
-  override def requiredChildDistribution: Seq[Distribution] =
-    if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
-
-  protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
-    child.execute().mapPartitions( { iterator =>
-      val ordering = newOrdering(sortOrder, child.output)
-      iterator.map(_.copy()).toArray.sorted(ordering).iterator
-    }, preservesPartitioning = true)
-  }
-
-  override def output: Seq[Attribute] = child.output
-
-  override def outputOrdering: Seq[SortOrder] = sortOrder
-}
-
 /**
  * Performs a sort, spilling to disk as needed.
  * @param global when true performs a global sort of all partitions by shuffling the data first
  *               if necessary.
  */
-case class ExternalSort(
+case class Sort(
     sortOrder: Seq[SortOrder],
     global: Boolean,
     child: SparkPlan)
@@ -93,7 +67,7 @@ case class ExternalSort(
 }
 
 /**
- * Optimized version of [[ExternalSort]] that operates on binary data (implemented as part of
+ * Optimized version of [[Sort]] that operates on binary data (implemented as part of
  * Project Tungsten).
  *
  * @param global when true performs a global sort of all partitions by shuffling the data first
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index f9981356f364f9190319c47d83180acdbf65695c..05b4127cbcaff194545007832a02c3b712e329de 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -581,28 +581,12 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
       mapData.collect().sortBy(_.data(1)).reverse.map(Row.fromTuple).toSeq)
   }
 
-  test("sorting") {
-    withSQLConf(SQLConf.EXTERNAL_SORT.key -> "false") {
-      sortTest()
-    }
-  }
-
   test("external sorting") {
-    withSQLConf(SQLConf.EXTERNAL_SORT.key -> "true") {
-      sortTest()
-    }
-  }
-
-  test("SPARK-6927 sorting with codegen on") {
-    withSQLConf(SQLConf.EXTERNAL_SORT.key -> "false",
-      SQLConf.CODEGEN_ENABLED.key -> "true") {
-      sortTest()
-    }
+    sortTest()
   }
 
   test("SPARK-6927 external sorting with codegen on") {
-    withSQLConf(SQLConf.EXTERNAL_SORT.key -> "true",
-      SQLConf.CODEGEN_ENABLED.key -> "true") {
+    withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "true") {
       sortTest()
     }
   }
@@ -1731,10 +1715,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
   }
 
   test("external sorting updates peak execution memory") {
-    withSQLConf((SQLConf.EXTERNAL_SORT.key, "true")) {
-      AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") {
-        sortTest()
-      }
+    AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") {
+      sortTest()
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
index 4492e37ad01ff9cfa188e44f623ffaa82bc14e2e..5dc37e5c3c2381bee74a14e8a6ead383cee4d5cd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
@@ -32,7 +32,7 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext {
     case c: ConvertToSafe => c
   }
 
-  private val outputsSafe = ExternalSort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
+  private val outputsSafe = Sort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
   assert(!outputsSafe.outputsUnsafeRows)
   private val outputsUnsafe = TungstenSort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
   assert(outputsUnsafe.outputsUnsafeRows)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
index 3073d492e613b737ae8df4ff4276dc541f546ddd..847c188a30333233b18fc936c38737d92c59e64f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
@@ -36,13 +36,13 @@ class SortSuite extends SparkPlanTest with SharedSQLContext {
 
     checkAnswer(
       input.toDF("a", "b", "c"),
-      ExternalSort('a.asc :: 'b.asc :: Nil, global = true, _: SparkPlan),
+      Sort('a.asc :: 'b.asc :: Nil, global = true, _: SparkPlan),
       input.sortBy(t => (t._1, t._2)).map(Row.fromTuple),
       sortAnswers = false)
 
     checkAnswer(
       input.toDF("a", "b", "c"),
-      ExternalSort('b.asc :: 'a.asc :: Nil, global = true, _: SparkPlan),
+      Sort('b.asc :: 'a.asc :: Nil, global = true, _: SparkPlan),
       input.sortBy(t => (t._2, t._1)).map(Row.fromTuple),
       sortAnswers = false)
   }