diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
index 5808e3f66de3c14a1a8272bed8a8ca718fd52e84..98464edf4d3908e29476102f61d848c2fd5b7dda 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
@@ -320,7 +320,7 @@ case class MaxOf(left: Expression, right: Expression) extends BinaryArithmetic {
 
   override def nullable: Boolean = left.nullable && right.nullable
 
-  private lazy val ordering = TypeUtils.getOrdering(dataType)
+  private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType)
 
   override def eval(input: InternalRow): Any = {
     val input1 = left.eval(input)
@@ -374,7 +374,7 @@ case class MinOf(left: Expression, right: Expression) extends BinaryArithmetic {
 
   override def nullable: Boolean = left.nullable && right.nullable
 
-  private lazy val ordering = TypeUtils.getOrdering(dataType)
+  private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType)
 
   override def eval(input: InternalRow): Any = {
     val input1 = left.eval(input)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala
index 961b1d8616801cbb2c1fe4f5598374ff7fa84e93..d51f3d3cef5880621e3ac9a717a4d166da75d8ec 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala
@@ -319,7 +319,7 @@ case class Least(children: Seq[Expression]) extends Expression {
   override def nullable: Boolean = children.forall(_.nullable)
   override def foldable: Boolean = children.forall(_.foldable)
 
-  private lazy val ordering = TypeUtils.getOrdering(dataType)
+  private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType)
 
   override def checkInputDataTypes(): TypeCheckResult = {
     if (children.length <= 1) {
@@ -374,7 +374,7 @@ case class Greatest(children: Seq[Expression]) extends Expression {
   override def nullable: Boolean = children.forall(_.nullable)
   override def foldable: Boolean = children.forall(_.foldable)
 
-  private lazy val ordering = TypeUtils.getOrdering(dataType)
+  private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType)
 
   override def checkInputDataTypes(): TypeCheckResult = {
     if (children.length <= 1) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/RowOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
similarity index 85%
rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/RowOrdering.scala
rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
index 873f5324c573e30f3a443895a99fbb1c07d99f1c..6407c73bc97d9dc09f4d6550c073e03298543c98 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/RowOrdering.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.types._
 /**
  * An interpreted row ordering comparator.
  */
-class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
+class InterpretedOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
 
   def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
     this(ordering.map(BindReferences.bindReference(_, inputSchema)))
@@ -49,9 +49,9 @@ class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
           case dt: AtomicType if order.direction == Descending =>
             dt.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
           case s: StructType if order.direction == Ascending =>
-            s.ordering.asInstanceOf[Ordering[Any]].compare(left, right)
+            s.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right)
           case s: StructType if order.direction == Descending =>
-            s.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
+            s.interpretedOrdering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
           case other =>
             throw new IllegalArgumentException(s"Type $other does not support ordered operations")
         }
@@ -65,6 +65,18 @@ class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
   }
 }
 
+object InterpretedOrdering {
+
+  /**
+   * Creates a [[InterpretedOrdering]] for the given schema, in natural ascending order.
+   */
+  def forSchema(dataTypes: Seq[DataType]): InterpretedOrdering = {
+    new InterpretedOrdering(dataTypes.zipWithIndex.map {
+      case (dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending)
+    })
+  }
+}
+
 object RowOrdering {
 
   /**
@@ -81,13 +93,4 @@ object RowOrdering {
    * Returns true iff outputs from the expressions can be ordered.
    */
   def isOrderable(exprs: Seq[Expression]): Boolean = exprs.forall(e => isOrderable(e.dataType))
-
-  /**
-   * Creates a [[RowOrdering]] for the given schema, in natural ascending order.
-   */
-  def forSchema(dataTypes: Seq[DataType]): RowOrdering = {
-    new RowOrdering(dataTypes.zipWithIndex.map {
-      case (dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending)
-    })
-  }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 68c832d7194d4cb1d142725638ff3ef37a1c34af..fe7dffb815987004b3ee43e389023c046eba23ba 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -376,7 +376,7 @@ case class LessThan(left: Expression, right: Expression) extends BinaryCompariso
 
   override def symbol: String = "<"
 
-  private lazy val ordering = TypeUtils.getOrdering(left.dataType)
+  private lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType)
 
   protected override def nullSafeEval(input1: Any, input2: Any): Any = ordering.lt(input1, input2)
 }
@@ -388,7 +388,7 @@ case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryCo
 
   override def symbol: String = "<="
 
-  private lazy val ordering = TypeUtils.getOrdering(left.dataType)
+  private lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType)
 
   protected override def nullSafeEval(input1: Any, input2: Any): Any = ordering.lteq(input1, input2)
 }
@@ -400,7 +400,7 @@ case class GreaterThan(left: Expression, right: Expression) extends BinaryCompar
 
   override def symbol: String = ">"
 
-  private lazy val ordering = TypeUtils.getOrdering(left.dataType)
+  private lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType)
 
   protected override def nullSafeEval(input1: Any, input2: Any): Any = ordering.gt(input1, input2)
 }
@@ -412,7 +412,7 @@ case class GreaterThanOrEqual(left: Expression, right: Expression) extends Binar
 
   override def symbol: String = ">="
 
-  private lazy val ordering = TypeUtils.getOrdering(left.dataType)
+  private lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType)
 
   protected override def nullSafeEval(input1: Any, input2: Any): Any = ordering.gteq(input1, input2)
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
index 0b41f92c6193c0e7123e13e2b6b4fd92060ebdde..bcf4d78fb937138db1ec7f5b776600de9f298f11 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
@@ -54,10 +54,10 @@ object TypeUtils {
   def getNumeric(t: DataType): Numeric[Any] =
     t.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]]
 
-  def getOrdering(t: DataType): Ordering[Any] = {
+  def getInterpretedOrdering(t: DataType): Ordering[Any] = {
     t match {
       case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
-      case s: StructType => s.ordering.asInstanceOf[Ordering[Any]]
+      case s: StructType => s.interpretedOrdering.asInstanceOf[Ordering[Any]]
     }
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index 6928707f7bf6e8740a0219010421a88c99eeee21..9cbc207538d4ff0385583fe7308110d437317c5e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -24,7 +24,7 @@ import org.json4s.JsonDSL._
 
 import org.apache.spark.SparkException
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute, RowOrdering}
+import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, AttributeReference, Attribute, InterpretedOrdering$}
 
 
 /**
@@ -301,7 +301,7 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
     StructType(newFields)
   }
 
-  private[sql] val ordering = RowOrdering.forSchema(this.fields.map(_.dataType))
+  private[sql] val interpretedOrdering = InterpretedOrdering.forSchema(this.fields.map(_.dataType))
 }
 
 object StructType extends AbstractDataType {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
index cc82f7c3f5a73802500c6df688201248a1ada3b3..e310aee2216669d95e00da230940788f6c3c6932 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
@@ -54,7 +54,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
   // GenerateOrdering agrees with RowOrdering.
   (DataTypeTestUtils.atomicTypes ++ Set(NullType)).foreach { dataType =>
     test(s"GenerateOrdering with $dataType") {
-      val rowOrdering = RowOrdering.forSchema(Seq(dataType, dataType))
+      val rowOrdering = InterpretedOrdering.forSchema(Seq(dataType, dataType))
       val genOrdering = GenerateOrdering.generate(
         BoundReference(0, dataType, nullable = true).asc ::
           BoundReference(1, dataType, nullable = true).asc :: Nil)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 05b009d1935bb73353a8354c077ee5d45b4f9e00..6ea5eeedf1bbe316c9673f7a7fdaaba71a09fa8a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -156,7 +156,10 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
           val mutablePair = new MutablePair[InternalRow, Null]()
           iter.map(row => mutablePair.update(row.copy(), null))
         }
-        implicit val ordering = new RowOrdering(sortingExpressions, child.output)
+        // We need to use an interpreted ordering here because generated orderings cannot be
+        // serialized and this ordering needs to be created on the driver in order to be passed into
+        // Spark core code.
+        implicit val ordering = new InterpretedOrdering(sortingExpressions, child.output)
         new RangePartitioner(numPartitions, rddForSampling, ascending = true)
       case SinglePartition =>
         new Partitioner {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index dbc0cefbe2e10c80efe2ade52f8d3e66e35373e1..2f29067f5646a45f80989ad1e87201ecf547b31d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.types.DataType
 
 object SparkPlan {
   protected[sql] val currentContext = new ThreadLocal[SQLContext]()
@@ -309,13 +310,22 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
             throw e
           } else {
             log.error("Failed to generate ordering, fallback to interpreted", e)
-            new RowOrdering(order, inputSchema)
+            new InterpretedOrdering(order, inputSchema)
           }
       }
     } else {
-      new RowOrdering(order, inputSchema)
+      new InterpretedOrdering(order, inputSchema)
     }
   }
+  /**
+   * Creates a row ordering for the given schema, in natural ascending order.
+   */
+  protected def newNaturalAscendingOrdering(dataTypes: Seq[DataType]): Ordering[InternalRow] = {
+    val order: Seq[SortOrder] = dataTypes.zipWithIndex.map {
+      case (dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending)
+    }
+    newOrdering(order, Seq.empty)
+  }
 }
 
 private[sql] trait LeafNode extends SparkPlan {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 477170297c2acb3bf400b5a17c01532158659108..f4677b4ee86bb626294e602a952beab0ba3a8191 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -212,7 +212,9 @@ case class TakeOrderedAndProject(
 
   override def outputPartitioning: Partitioning = SinglePartition
 
-  private val ord: RowOrdering = new RowOrdering(sortOrder, child.output)
+  // We need to use an interpreted ordering here because generated orderings cannot be serialized
+  // and this ordering needs to be created on the driver in order to be passed into Spark core code.
+  private val ord: InterpretedOrdering = new InterpretedOrdering(sortOrder, child.output)
 
   // TODO: remove @transient after figure out how to clean closure at InsertIntoHiveTable.
   @transient private val projection = projectList.map(new InterpretedProjection(_, child.output))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index eb595490fbf28dd7989abe57fe06124a77dfc1c9..4ae23c186cf7bcb64b12acf01cf5589322e3a79c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -48,9 +48,6 @@ case class SortMergeJoin(
   override def requiredChildDistribution: Seq[Distribution] =
     ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
 
-  // this is to manually construct an ordering that can be used to compare keys from both sides
-  private val keyOrdering: RowOrdering = RowOrdering.forSchema(leftKeys.map(_.dataType))
-
   override def outputOrdering: Seq[SortOrder] = requiredOrders(leftKeys)
 
   override def requiredChildOrdering: Seq[Seq[SortOrder]] =
@@ -59,8 +56,10 @@ case class SortMergeJoin(
   @transient protected lazy val leftKeyGenerator = newProjection(leftKeys, left.output)
   @transient protected lazy val rightKeyGenerator = newProjection(rightKeys, right.output)
 
-  private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] =
+  private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] = {
+    // This must be ascending in order to agree with the `keyOrdering` defined in `doExecute()`.
     keys.map(SortOrder(_, Ascending))
+  }
 
   protected override def doExecute(): RDD[InternalRow] = {
     val leftResults = left.execute().map(_.copy())
@@ -68,6 +67,8 @@ case class SortMergeJoin(
 
     leftResults.zipPartitions(rightResults) { (leftIter, rightIter) =>
       new Iterator[InternalRow] {
+        // An ordering that can be used to compare keys from both sides.
+        private[this] val keyOrdering = newNaturalAscendingOrdering(leftKeys.map(_.dataType))
         // Mutable per row objects.
         private[this] val joinRow = new JoinedRow
         private[this] var leftElement: InternalRow = _
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
index 08156f0e39ce8ad0f8befb1cd592ac61d276fb45..a9515a03acf2c94c41848c2952b030399019ecb0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
@@ -22,7 +22,7 @@ import scala.util.Random
 import org.apache.spark._
 import org.apache.spark.sql.{RandomDataGenerator, Row}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, RowOrdering, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, UnsafeRow, UnsafeProjection}
 import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator, TaskMemoryManager}
@@ -144,8 +144,8 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite {
     }
     sorter.cleanupResources()
 
-    val keyOrdering = RowOrdering.forSchema(keySchema.map(_.dataType))
-    val valueOrdering = RowOrdering.forSchema(valueSchema.map(_.dataType))
+    val keyOrdering = InterpretedOrdering.forSchema(keySchema.map(_.dataType))
+    val valueOrdering = InterpretedOrdering.forSchema(valueSchema.map(_.dataType))
     val kvOrdering = new Ordering[(InternalRow, InternalRow)] {
       override def compare(x: (InternalRow, InternalRow), y: (InternalRow, InternalRow)): Int = {
         keyOrdering.compare(x._1, y._1) match {