diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 66860a4c0923abe4bccf193afb76efb79c31bfc8..f79d4ff444dc0ce6207e4c30c49fa61bef488a91 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -204,8 +204,8 @@ class SqlParser extends AbstractSparkSQLParser {
     )
 
   protected lazy val sortType: Parser[LogicalPlan => LogicalPlan] =
-    ( ORDER ~ BY  ~> ordering ^^ { case o => l: LogicalPlan => Sort(o, l) }
-    | SORT ~ BY  ~> ordering ^^ { case o => l: LogicalPlan => SortPartitions(o, l) }
+    ( ORDER ~ BY  ~> ordering ^^ { case o => l: LogicalPlan => Sort(o, true, l) }
+    | SORT ~ BY  ~> ordering ^^ { case o => l: LogicalPlan => Sort(o, false, l) }
     )
 
   protected lazy val ordering: Parser[Seq[SortOrder]] =
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 1c4088b8438e12713fd9d576edeae32a58bc0272..72680f37a0b4de53a6c0da01e88da7564ef7ff19 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -246,7 +246,7 @@ class Analyzer(catalog: Catalog,
       case p: LogicalPlan if !p.childrenResolved => p
 
       // If the projection list contains Stars, expand it.
-      case p@Project(projectList, child) if containsStar(projectList) =>
+      case p @ Project(projectList, child) if containsStar(projectList) =>
         Project(
           projectList.flatMap {
             case s: Star => s.expand(child.output, resolver)
@@ -310,7 +310,8 @@ class Analyzer(catalog: Catalog,
    */
   object ResolveSortReferences extends Rule[LogicalPlan] {
     def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-      case s @ Sort(ordering, p @ Project(projectList, child)) if !s.resolved && p.resolved =>
+      case s @ Sort(ordering, global, p @ Project(projectList, child))
+          if !s.resolved && p.resolved =>
         val unresolved = ordering.flatMap(_.collect { case UnresolvedAttribute(name) => name })
         val resolved = unresolved.flatMap(child.resolve(_, resolver))
         val requiredAttributes = AttributeSet(resolved.collect { case a: Attribute => a })
@@ -319,13 +320,14 @@ class Analyzer(catalog: Catalog,
         if (missingInProject.nonEmpty) {
           // Add missing attributes and then project them away after the sort.
           Project(projectList.map(_.toAttribute),
-            Sort(ordering,
+            Sort(ordering, global,
               Project(projectList ++ missingInProject, child)))
         } else {
           logDebug(s"Failed to find $missingInProject in ${p.output.mkString(", ")}")
           s // Nothing we can do here. Return original plan.
         }
-      case s @ Sort(ordering, a @ Aggregate(grouping, aggs, child)) if !s.resolved && a.resolved =>
+      case s @ Sort(ordering, global, a @ Aggregate(grouping, aggs, child))
+          if !s.resolved && a.resolved =>
         val unresolved = ordering.flatMap(_.collect { case UnresolvedAttribute(name) => name })
         // A small hack to create an object that will allow us to resolve any references that
         // refer to named expressions that are present in the grouping expressions.
@@ -340,8 +342,7 @@ class Analyzer(catalog: Catalog,
         if (missingInAggs.nonEmpty) {
           // Add missing grouping exprs and then project them away after the sort.
           Project(a.output,
-            Sort(ordering,
-              Aggregate(grouping, aggs ++ missingInAggs, child)))
+            Sort(ordering, global, Aggregate(grouping, aggs ++ missingInAggs, child)))
         } else {
           s // Nothing we can do here. Return original plan.
         }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index fb252cdf515348e2a43f451a1d3461e64a9542b9..a14e5b9ef14d04de3ab23bd430aabc6fba7e8329 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -244,9 +244,9 @@ package object dsl {
         condition: Option[Expression] = None) =
       Join(logicalPlan, otherPlan, joinType, condition)
 
-    def orderBy(sortExprs: SortOrder*) = Sort(sortExprs, logicalPlan)
+    def orderBy(sortExprs: SortOrder*) = Sort(sortExprs, true, logicalPlan)
 
-    def sortBy(sortExprs: SortOrder*) = SortPartitions(sortExprs, logicalPlan)
+    def sortBy(sortExprs: SortOrder*) = Sort(sortExprs, false, logicalPlan)
 
     def groupBy(groupingExprs: Expression*)(aggregateExprs: Expression*) = {
       val aliasedExprs = aggregateExprs.map {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index a9282b98adfab5683a4171e956164f07cce0105b..0b9f01cbae9eac5504f9d79450bf90c3dda266b8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -130,7 +130,16 @@ case class WriteToFile(
   override def output = child.output
 }
 
-case class Sort(order: Seq[SortOrder], child: LogicalPlan) extends UnaryNode {
+/**
+ * @param order  The ordering expressions 
+ * @param global True means global sorting apply for entire data set, 
+ *               False means sorting only apply within the partition.
+ * @param child  Child logical plan              
+ */
+case class Sort(
+    order: Seq[SortOrder],
+    global: Boolean,
+    child: LogicalPlan) extends UnaryNode {
   override def output = child.output
 }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 856b10f1a8fd8e2cfcd55a799f683fe042d88a07..80787b61ce1bfa7816099f0d00dfa1525401fd97 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -214,7 +214,7 @@ class SchemaRDD(
    * @group Query
    */
   def orderBy(sortExprs: SortOrder*): SchemaRDD =
-    new SchemaRDD(sqlContext, Sort(sortExprs, logicalPlan))
+    new SchemaRDD(sqlContext, Sort(sortExprs, true, logicalPlan))
 
   /**
    * Sorts the results by the given expressions within partition.
@@ -227,7 +227,7 @@ class SchemaRDD(
    * @group Query
    */
   def sortBy(sortExprs: SortOrder*): SchemaRDD =
-    new SchemaRDD(sqlContext, SortPartitions(sortExprs, logicalPlan))
+    new SchemaRDD(sqlContext, Sort(sortExprs, false, logicalPlan))
 
   @deprecated("use limit with integer argument", "1.1.0")
   def limit(limitExpr: Expression): SchemaRDD =
@@ -238,7 +238,6 @@ class SchemaRDD(
    * {{{
    *   schemaRDD.limit(10)
    * }}}
-   * 
    * @group Query
    */
   def limit(limitNum: Int): SchemaRDD =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 2954d4ce7d2d87007372584eb8c8416c534d7a5a..9151da69ed44cca69521c1e9e499af9643f95338 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -190,7 +190,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
 
   object TakeOrdered extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case logical.Limit(IntegerLiteral(limit), logical.Sort(order, child)) =>
+      case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
         execution.TakeOrdered(limit, order, planLater(child)) :: Nil
       case _ => Nil
     }
@@ -257,15 +257,14 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         execution.Distinct(partial = false,
           execution.Distinct(partial = true, planLater(child))) :: Nil
 
-      case logical.Sort(sortExprs, child) if sqlContext.externalSortEnabled =>
-        execution.ExternalSort(sortExprs, global = true, planLater(child)):: Nil
-      case logical.Sort(sortExprs, child) =>
-        execution.Sort(sortExprs, global = true, planLater(child)):: Nil
-
       case logical.SortPartitions(sortExprs, child) =>
         // This sort only sorts tuples within a partition. Its requiredDistribution will be
         // an UnspecifiedDistribution.
         execution.Sort(sortExprs, global = false, planLater(child)) :: Nil
+      case logical.Sort(sortExprs, global, child) if sqlContext.externalSortEnabled =>
+        execution.ExternalSort(sortExprs, global, planLater(child)):: Nil
+      case logical.Sort(sortExprs, global, child) =>
+        execution.Sort(sortExprs, global, planLater(child)):: Nil
       case logical.Project(projectList, child) =>
         execution.Project(projectList, planLater(child)) :: Nil
       case logical.Filter(condition, child) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
index 691c4b38287bf3db6f98c0f374d1adf23abde54b..c0b9cf516312061c3f6b7a230c8cd0df8c3c7c99 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
@@ -88,7 +88,7 @@ class DslQuerySuite extends QueryTest {
       Seq(Seq(6)))
   }
 
-  test("sorting") {
+  test("global sorting") {
     checkAnswer(
       testData2.orderBy('a.asc, 'b.asc),
       Seq((1,1), (1,2), (2,1), (2,2), (3,1), (3,2)))
@@ -122,22 +122,31 @@ class DslQuerySuite extends QueryTest {
       mapData.collect().sortBy(_.data(1)).reverse.toSeq)
   }
 
-  test("sorting #2") {
+  test("partition wide sorting") {
+    // 2 partitions totally, and
+    // Partition #1 with values:
+    //    (1, 1)
+    //    (1, 2)
+    //    (2, 1)
+    // Partition #2 with values:
+    //    (2, 2)
+    //    (3, 1)
+    //    (3, 2)
     checkAnswer(
       testData2.sortBy('a.asc, 'b.asc),
       Seq((1,1), (1,2), (2,1), (2,2), (3,1), (3,2)))
 
     checkAnswer(
       testData2.sortBy('a.asc, 'b.desc),
-      Seq((1,2), (1,1), (2,2), (2,1), (3,2), (3,1)))
+      Seq((1,2), (1,1), (2,1), (2,2), (3,2), (3,1)))
 
     checkAnswer(
       testData2.sortBy('a.desc, 'b.desc),
-      Seq((3,2), (3,1), (2,2), (2,1), (1,2), (1,1)))
+      Seq((2,1), (1,2), (1,1), (3,2), (3,1), (2,2)))
 
     checkAnswer(
       testData2.sortBy('a.desc, 'b.asc),
-      Seq((3,1), (3,2), (2,1), (2,2), (1,1), (1,2)))
+      Seq((2,1), (1,1), (1,2), (3,1), (3,2), (2,2)))
   }
 
   test("limit") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
index bb553a0a1e50c57cde5bccde6a1189f1e0788270..497897c3c0d4d88c737ebaa60481f2fea8fbac98 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
@@ -55,7 +55,7 @@ object TestData {
       TestData2(2, 1) ::
       TestData2(2, 2) ::
       TestData2(3, 1) ::
-      TestData2(3, 2) :: Nil).toSchemaRDD
+      TestData2(3, 2) :: Nil, 2).toSchemaRDD
   testData2.registerTempTable("testData2")
 
   case class DecimalData(a: BigDecimal, b: BigDecimal)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 3f3d9e7cd4fbebac6ef370687c36a106081b828b..8a9613cf96e541faf4e11307e9920aa19f4688e9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -680,16 +680,16 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
         val withSort =
           (orderByClause, sortByClause, distributeByClause, clusterByClause) match {
             case (Some(totalOrdering), None, None, None) =>
-              Sort(totalOrdering.getChildren.map(nodeToSortOrder), withHaving)
+              Sort(totalOrdering.getChildren.map(nodeToSortOrder), true, withHaving)
             case (None, Some(perPartitionOrdering), None, None) =>
-              SortPartitions(perPartitionOrdering.getChildren.map(nodeToSortOrder), withHaving)
+              Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false, withHaving)
             case (None, None, Some(partitionExprs), None) =>
               Repartition(partitionExprs.getChildren.map(nodeToExpr), withHaving)
             case (None, Some(perPartitionOrdering), Some(partitionExprs), None) =>
-              SortPartitions(perPartitionOrdering.getChildren.map(nodeToSortOrder),
+              Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false,
                 Repartition(partitionExprs.getChildren.map(nodeToExpr), withHaving))
             case (None, None, None, Some(clusterExprs)) =>
-              SortPartitions(clusterExprs.getChildren.map(nodeToExpr).map(SortOrder(_, Ascending)),
+              Sort(clusterExprs.getChildren.map(nodeToExpr).map(SortOrder(_, Ascending)), false,
                 Repartition(clusterExprs.getChildren.map(nodeToExpr), withHaving))
             case (None, None, None, None) => withHaving
             case _ => sys.error("Unsupported set of ordering / distribution clauses.")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 8011f9b8773b3c5650c0aff08a291e7fba3eeb0d..4104df8f8e0223eeb2c9359481e4fc7a5c9ae0c3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -132,7 +132,7 @@ abstract class HiveComparisonTest
 
     def isSorted(plan: LogicalPlan): Boolean = plan match {
       case _: Join | _: Aggregate | _: Generate | _: Sample | _: Distinct => false
-      case PhysicalOperation(_, _, Sort(_, _)) => true
+      case PhysicalOperation(_, _, Sort(_, true, _)) => true
       case _ => plan.children.iterator.exists(isSorted)
     }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index f57f31af1556696a3f90a061b4d279d356fe70ea..5d0fb7237011f6c907125e35a44e3022216614d9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -32,6 +32,13 @@ case class Nested3(f3: Int)
  * valid, but Hive currently cannot execute it.
  */
 class SQLQuerySuite extends QueryTest {
+  test("SPARK-4512 Fix attribute reference resolution error when using SORT BY") {
+    checkAnswer(
+      sql("SELECT * FROM (SELECT key + key AS a FROM src SORT BY value) t ORDER BY t.a"),
+      sql("SELECT key + key as a FROM src ORDER BY a").collect().toSeq
+    )
+  }
+
   test("CTAS with serde") {
     sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect
     sql(