diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
index c44630fbbc2f0a7622cb799420df39eaed80ca5a..116c84943e85591afb5db04da2b418cdf47d7334 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
@@ -29,12 +29,23 @@ public class PrefixComparators {
 
   public static final PrefixComparator STRING = new UnsignedPrefixComparator();
   public static final PrefixComparator STRING_DESC = new UnsignedPrefixComparatorDesc();
+  public static final PrefixComparator STRING_NULLS_LAST = new UnsignedPrefixComparatorNullsLast();
+  public static final PrefixComparator STRING_DESC_NULLS_FIRST = new UnsignedPrefixComparatorDescNullsFirst();
+
   public static final PrefixComparator BINARY = new UnsignedPrefixComparator();
   public static final PrefixComparator BINARY_DESC = new UnsignedPrefixComparatorDesc();
+  public static final PrefixComparator BINARY_NULLS_LAST = new UnsignedPrefixComparatorNullsLast();
+  public static final PrefixComparator BINARY_DESC_NULLS_FIRST = new UnsignedPrefixComparatorDescNullsFirst();
+
   public static final PrefixComparator LONG = new SignedPrefixComparator();
   public static final PrefixComparator LONG_DESC = new SignedPrefixComparatorDesc();
+  public static final PrefixComparator LONG_NULLS_LAST = new SignedPrefixComparatorNullsLast();
+  public static final PrefixComparator LONG_DESC_NULLS_FIRST = new SignedPrefixComparatorDescNullsFirst();
+
   public static final PrefixComparator DOUBLE = new UnsignedPrefixComparator();
   public static final PrefixComparator DOUBLE_DESC = new UnsignedPrefixComparatorDesc();
+  public static final PrefixComparator DOUBLE_NULLS_LAST = new UnsignedPrefixComparatorNullsLast();
+  public static final PrefixComparator DOUBLE_DESC_NULLS_FIRST = new UnsignedPrefixComparatorDescNullsFirst();
 
   public static final class StringPrefixComparator {
     public static long computePrefix(UTF8String value) {
@@ -74,6 +85,9 @@ public class PrefixComparators {
 
     /** @return Whether the sort should take into account the sign bit. */
     public abstract boolean sortSigned();
+
+    /** @return Whether the sort should put nulls first or last. */
+    public abstract boolean nullsFirst();
   }
 
   //
@@ -83,16 +97,34 @@ public class PrefixComparators {
   public static final class UnsignedPrefixComparator extends RadixSortSupport {
     @Override public boolean sortDescending() { return false; }
     @Override public boolean sortSigned() { return false; }
-    @Override
+    @Override public boolean nullsFirst() { return true; }
+    public int compare(long aPrefix, long bPrefix) {
+      return UnsignedLongs.compare(aPrefix, bPrefix);
+    }
+  }
+
+  public static final class UnsignedPrefixComparatorNullsLast extends RadixSortSupport {
+    @Override public boolean sortDescending() { return false; }
+    @Override public boolean sortSigned() { return false; }
+    @Override public boolean nullsFirst() { return false; }
     public int compare(long aPrefix, long bPrefix) {
       return UnsignedLongs.compare(aPrefix, bPrefix);
     }
   }
 
+  public static final class UnsignedPrefixComparatorDescNullsFirst extends RadixSortSupport {
+    @Override public boolean sortDescending() { return true; }
+    @Override public boolean sortSigned() { return false; }
+    @Override public boolean nullsFirst() { return true; }
+    public int compare(long bPrefix, long aPrefix) {
+      return UnsignedLongs.compare(aPrefix, bPrefix);
+    }
+  }
+
   public static final class UnsignedPrefixComparatorDesc extends RadixSortSupport {
     @Override public boolean sortDescending() { return true; }
     @Override public boolean sortSigned() { return false; }
-    @Override
+    @Override public boolean nullsFirst() { return false; }
     public int compare(long bPrefix, long aPrefix) {
       return UnsignedLongs.compare(aPrefix, bPrefix);
     }
@@ -101,16 +133,34 @@ public class PrefixComparators {
   public static final class SignedPrefixComparator extends RadixSortSupport {
     @Override public boolean sortDescending() { return false; }
     @Override public boolean sortSigned() { return true; }
-    @Override
+    @Override public boolean nullsFirst() { return true; }
+    public int compare(long a, long b) {
+      return (a < b) ? -1 : (a > b) ? 1 : 0;
+    }
+  }
+
+  public static final class SignedPrefixComparatorNullsLast extends RadixSortSupport {
+    @Override public boolean sortDescending() { return false; }
+    @Override public boolean sortSigned() { return true; }
+    @Override public boolean nullsFirst() { return false; }
     public int compare(long a, long b) {
       return (a < b) ? -1 : (a > b) ? 1 : 0;
     }
   }
 
+  public static final class SignedPrefixComparatorDescNullsFirst extends RadixSortSupport {
+    @Override public boolean sortDescending() { return true; }
+    @Override public boolean sortSigned() { return true; }
+    @Override public boolean nullsFirst() { return true; }
+    public int compare(long b, long a) {
+      return (a < b) ? -1 : (a > b) ? 1 : 0;
+    }
+  }
+
   public static final class SignedPrefixComparatorDesc extends RadixSortSupport {
     @Override public boolean sortDescending() { return true; }
     @Override public boolean sortSigned() { return true; }
-    @Override
+    @Override public boolean nullsFirst() { return false; }
     public int compare(long b, long a) {
       return (a < b) ? -1 : (a > b) ? 1 : 0;
     }
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 30d0f3006a04e82ebecde5fdb151fa1c52b0b781..be382955c0d4219a79b59be328d73ec882b83e18 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -333,17 +333,18 @@ public final class UnsafeInMemorySorter {
     if (nullBoundaryPos > 0) {
       assert radixSortSupport != null : "Nulls are only stored separately with radix sort";
       LinkedList<UnsafeSorterIterator> queue = new LinkedList<>();
-      if (radixSortSupport.sortDescending()) {
-        // Nulls are smaller than non-nulls
-        queue.add(new SortedIterator((pos - nullBoundaryPos) / 2, offset));
+
+      // The null order is either LAST or FIRST, regardless of sorting direction (ASC|DESC)
+      if (radixSortSupport.nullsFirst()) {
         queue.add(new SortedIterator(nullBoundaryPos / 2, 0));
+        queue.add(new SortedIterator((pos - nullBoundaryPos) / 2, offset));
       } else {
-        queue.add(new SortedIterator(nullBoundaryPos / 2, 0));
         queue.add(new SortedIterator((pos - nullBoundaryPos) / 2, offset));
+        queue.add(new SortedIterator(nullBoundaryPos / 2, 0));
       }
       return new UnsafeExternalSorter.ChainedIterator(queue);
     } else {
       return new SortedIterator(pos / 2, offset);
     }
   }
-}
+}
\ No newline at end of file
diff --git a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
index 2c138064101924ae69af4dd8f1c0f59706e814d3..366ffda7788d372f1383c65e51b2ddbbfb485030 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
@@ -40,23 +40,38 @@ class RadixSortSuite extends SparkFunSuite with Logging {
   case class RadixSortType(
     name: String,
     referenceComparator: PrefixComparator,
-    startByteIdx: Int, endByteIdx: Int, descending: Boolean, signed: Boolean)
+    startByteIdx: Int, endByteIdx: Int, descending: Boolean, signed: Boolean, nullsFirst: Boolean)
 
   val SORT_TYPES_TO_TEST = Seq(
-    RadixSortType("unsigned binary data asc", PrefixComparators.BINARY, 0, 7, false, false),
-    RadixSortType("unsigned binary data desc", PrefixComparators.BINARY_DESC, 0, 7, true, false),
-    RadixSortType("twos complement asc", PrefixComparators.LONG, 0, 7, false, true),
-    RadixSortType("twos complement desc", PrefixComparators.LONG_DESC, 0, 7, true, true),
+    RadixSortType("unsigned binary data asc nulls first",
+      PrefixComparators.BINARY, 0, 7, false, false, true),
+    RadixSortType("unsigned binary data asc nulls last",
+      PrefixComparators.BINARY_NULLS_LAST, 0, 7, false, false, false),
+    RadixSortType("unsigned binary data desc nulls last",
+      PrefixComparators.BINARY_DESC_NULLS_FIRST, 0, 7, true, false, false),
+    RadixSortType("unsigned binary data desc nulls first",
+      PrefixComparators.BINARY_DESC, 0, 7, true, false, true),
+
+    RadixSortType("twos complement asc nulls first",
+      PrefixComparators.LONG, 0, 7, false, true, true),
+    RadixSortType("twos complement asc nulls last",
+      PrefixComparators.LONG_NULLS_LAST, 0, 7, false, true, false),
+    RadixSortType("twos complement desc nulls last",
+      PrefixComparators.LONG_DESC, 0, 7, true, true, false),
+    RadixSortType("twos complement desc nulls first",
+      PrefixComparators.LONG_DESC_NULLS_FIRST, 0, 7, true, true, true),
+
     RadixSortType(
       "binary data partial",
       new PrefixComparators.RadixSortSupport {
         override def sortDescending = false
         override def sortSigned = false
+        override def nullsFirst = true
         override def compare(a: Long, b: Long): Int = {
           return PrefixComparators.BINARY.compare(a & 0xffffff0000L, b & 0xffffff0000L)
         }
       },
-      2, 4, false, false))
+      2, 4, false, false, true))
 
   private def generateTestData(size: Int, rand: => Long): (Array[JLong], LongArray) = {
     val ref = Array.tabulate[Long](size) { i => rand }
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 9a643465a9994429ea69c47138d40431ea6fa161..b475abdce2da9d0210a38516f62749d6bf65a59b 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -324,7 +324,7 @@ queryPrimary
     ;
 
 sortItem
-    : expression ordering=(ASC | DESC)?
+    : expression ordering=(ASC | DESC)? (NULLS nullOrder=(LAST | FIRST))?
     ;
 
 querySpecification
@@ -641,7 +641,8 @@ number
 nonReserved
     : SHOW | TABLES | COLUMNS | COLUMN | PARTITIONS | FUNCTIONS | DATABASES
     | ADD
-    | OVER | PARTITION | RANGE | ROWS | PRECEDING | FOLLOWING | CURRENT | ROW | MAP | ARRAY | STRUCT
+    | OVER | PARTITION | RANGE | ROWS | PRECEDING | FOLLOWING | CURRENT | ROW | LAST | FIRST
+    | MAP | ARRAY | STRUCT
     | LATERAL | WINDOW | REDUCE | TRANSFORM | USING | SERDE | SERDEPROPERTIES | RECORDREADER
     | DELIMITED | FIELDS | TERMINATED | COLLECTION | ITEMS | KEYS | ESCAPED | LINES | SEPARATED
     | EXTENDED | REFRESH | CLEAR | CACHE | UNCACHE | LAZY | TEMPORARY | OPTIONS
@@ -729,6 +730,8 @@ UNBOUNDED: 'UNBOUNDED';
 PRECEDING: 'PRECEDING';
 FOLLOWING: 'FOLLOWING';
 CURRENT: 'CURRENT';
+FIRST: 'FIRST';
+LAST: 'LAST';
 ROW: 'ROW';
 WITH: 'WITH';
 VALUES: 'VALUES';
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 18f814d6cdfd4d8de97d822e743beca3df639a6c..92bf8e0536fc49d7ad33b9e60d5a00749bac9ee0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -714,9 +714,9 @@ class Analyzer(
       case s @ Sort(orders, global, child)
         if orders.exists(_.child.isInstanceOf[UnresolvedOrdinal]) =>
         val newOrders = orders map {
-          case s @ SortOrder(UnresolvedOrdinal(index), direction) =>
+          case s @ SortOrder(UnresolvedOrdinal(index), direction, nullOrdering) =>
             if (index > 0 && index <= child.output.size) {
-              SortOrder(child.output(index - 1), direction)
+              SortOrder(child.output(index - 1), direction, nullOrdering)
             } else {
               s.failAnalysis(
                 s"ORDER BY position $index is not in select list " +
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteUnresolvedOrdinals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteUnresolvedOrdinals.scala
index 6d8dc8628229a4ff7b97cf5151915ac83e1f01a6..af0a565f73ae9302f1fe86461f16f4d861194829 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteUnresolvedOrdinals.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteUnresolvedOrdinals.scala
@@ -36,7 +36,7 @@ class SubstituteUnresolvedOrdinals(conf: CatalystConf) extends Rule[LogicalPlan]
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case s: Sort if conf.orderByOrdinal && s.order.exists(o => isIntLiteral(o.child)) =>
       val newOrders = s.order.map {
-        case order @ SortOrder(ordinal @ Literal(index: Int, IntegerType), _) =>
+        case order @ SortOrder(ordinal @ Literal(index: Int, IntegerType), _, _) =>
           val newOrdinal = withOrigin(ordinal.origin)(UnresolvedOrdinal(index))
           withOrigin(order.origin)(order.copy(child = newOrdinal))
         case other => other
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 8549187a66369c370585c85b21614448e7e9f313..66e52ca68af194ba492a32b2193ad5a74922d5f7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -109,8 +109,9 @@ package object dsl {
     def cast(to: DataType): Expression = Cast(expr, to)
 
     def asc: SortOrder = SortOrder(expr, Ascending)
+    def asc_nullsLast: SortOrder = SortOrder(expr, Ascending, NullsLast)
     def desc: SortOrder = SortOrder(expr, Descending)
-
+    def desc_nullsFirst: SortOrder = SortOrder(expr, Descending, NullsFirst)
     def as(alias: String): NamedExpression = Alias(expr, alias)()
     def as(alias: Symbol): NamedExpression = Alias(expr, alias.name)()
   }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
index de779ed3702d370121c44e44911250cc2bf5808c..d015125baccafcc526525193c3180375f9655205 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
@@ -21,26 +21,43 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
 import org.apache.spark.sql.types._
-import org.apache.spark.util.collection.unsafe.sort.PrefixComparators.BinaryPrefixComparator
-import org.apache.spark.util.collection.unsafe.sort.PrefixComparators.DoublePrefixComparator
+import org.apache.spark.util.collection.unsafe.sort.PrefixComparators._
 
 abstract sealed class SortDirection {
   def sql: String
+  def defaultNullOrdering: NullOrdering
+}
+
+abstract sealed class NullOrdering {
+  def sql: String
 }
 
 case object Ascending extends SortDirection {
   override def sql: String = "ASC"
+  override def defaultNullOrdering: NullOrdering = NullsFirst
 }
 
 case object Descending extends SortDirection {
   override def sql: String = "DESC"
+  override def defaultNullOrdering: NullOrdering = NullsLast
+}
+
+case object NullsFirst extends NullOrdering{
+  override def sql: String = "NULLS FIRST"
+}
+
+case object NullsLast extends NullOrdering{
+  override def sql: String = "NULLS LAST"
 }
 
 /**
  * An expression that can be used to sort a tuple.  This class extends expression primarily so that
  * transformations over expression will descend into its child.
  */
-case class SortOrder(child: Expression, direction: SortDirection)
+case class SortOrder(
+  child: Expression,
+  direction: SortDirection,
+  nullOrdering: NullOrdering)
   extends UnaryExpression with Unevaluable {
 
   /** Sort order is not foldable because we don't have an eval for it. */
@@ -57,12 +74,18 @@ case class SortOrder(child: Expression, direction: SortDirection)
   override def dataType: DataType = child.dataType
   override def nullable: Boolean = child.nullable
 
-  override def toString: String = s"$child ${direction.sql}"
-  override def sql: String = child.sql + " " + direction.sql
+  override def toString: String = s"$child ${direction.sql} ${nullOrdering.sql}"
+  override def sql: String = child.sql + " " + direction.sql + " " + nullOrdering.sql
 
   def isAscending: Boolean = direction == Ascending
 }
 
+object SortOrder {
+  def apply(child: Expression, direction: SortDirection): SortOrder = {
+    new SortOrder(child, direction, direction.defaultNullOrdering)
+  }
+}
+
 /**
  * An expression to generate a 64-bit long prefix used in sorting. If the sort must operate over
  * null keys as well, this.nullValue can be used in place of emitted null prefixes in the sort.
@@ -71,14 +94,35 @@ case class SortPrefix(child: SortOrder) extends UnaryExpression {
 
   val nullValue = child.child.dataType match {
     case BooleanType | DateType | TimestampType | _: IntegralType =>
-      Long.MinValue
+      if (nullAsSmallest) {
+        Long.MinValue
+      } else {
+        Long.MaxValue
+      }
     case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS =>
-      Long.MinValue
+      if (nullAsSmallest) {
+        Long.MinValue
+      } else {
+        Long.MaxValue
+      }
     case _: DecimalType =>
-      DoublePrefixComparator.computePrefix(Double.NegativeInfinity)
-    case _ => 0L
+      if (nullAsSmallest) {
+        DoublePrefixComparator.computePrefix(Double.NegativeInfinity)
+      } else {
+        DoublePrefixComparator.computePrefix(Double.NaN)
+      }
+    case _ =>
+      if (nullAsSmallest) {
+        0L
+      } else {
+        -1L
+      }
   }
 
+  private def nullAsSmallest: Boolean = (child.isAscending && child.nullOrdering == NullsFirst) ||
+      (!child.isAscending && child.nullOrdering == NullsLast)
+
+
   override def eval(input: InternalRow): Any = throw new UnsupportedOperationException
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
@@ -86,6 +130,7 @@ case class SortPrefix(child: SortOrder) extends UnaryExpression {
     val input = childCode.value
     val BinaryPrefixCmp = classOf[BinaryPrefixComparator].getName
     val DoublePrefixCmp = classOf[DoublePrefixComparator].getName
+    val StringPrefixCmp = classOf[StringPrefixComparator].getName
     val prefixCode = child.child.dataType match {
       case BooleanType =>
         s"$input ? 1L : 0L"
@@ -95,7 +140,7 @@ case class SortPrefix(child: SortOrder) extends UnaryExpression {
         s"(long) $input"
       case FloatType | DoubleType =>
         s"$DoublePrefixCmp.computePrefix((double)$input)"
-      case StringType => s"$input.getPrefix()"
+      case StringType => s"$StringPrefixCmp.computePrefix($input)"
       case BinaryType => s"$BinaryPrefixCmp.computePrefix($input)"
       case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS =>
         if (dt.precision <= Decimal.MAX_LONG_DIGITS) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
index f4d35d232e691f1fa1467248ee73f9ae02efaad7..e7df95e1142ca97d27e224541ab7fb72478a355d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
@@ -63,7 +63,7 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
    */
   def genComparisons(ctx: CodegenContext, schema: StructType): String = {
     val ordering = schema.fields.map(_.dataType).zipWithIndex.map {
-      case(dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending)
+      case(dt, index) => SortOrder(BoundReference(index, dt, nullable = true), Ascending)
     }
     genComparisons(ctx, ordering)
   }
@@ -74,7 +74,7 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
   def genComparisons(ctx: CodegenContext, ordering: Seq[SortOrder]): String = {
     val comparisons = ordering.map { order =>
       val eval = order.child.genCode(ctx)
-      val asc = order.direction == Ascending
+      val asc = order.isAscending
       val isNullA = ctx.freshName("isNullA")
       val primitiveA = ctx.freshName("primitiveA")
       val isNullB = ctx.freshName("isNullB")
@@ -99,9 +99,17 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
           if ($isNullA && $isNullB) {
             // Nothing
           } else if ($isNullA) {
-            return ${if (order.direction == Ascending) "-1" else "1"};
+            return ${
+        order.nullOrdering match {
+          case NullsFirst => "-1"
+          case NullsLast => "1"
+        }};
           } else if ($isNullB) {
-            return ${if (order.direction == Ascending) "1" else "-1"};
+            return ${
+        order.nullOrdering match {
+          case NullsFirst => "1"
+          case NullsLast => "-1"
+        }};
           } else {
             int comp = ${ctx.genComp(order.child.dataType, primitiveA, primitiveB)};
             if (comp != 0) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
index 6112259fed61980d51ef1ff47eab4e668b3cd1ba..79d2052c38a27f9724de7402bec05bb1a87e3b12 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
@@ -39,9 +39,9 @@ class InterpretedOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow
       if (left == null && right == null) {
         // Both null, continue looking.
       } else if (left == null) {
-        return if (order.direction == Ascending) -1 else 1
+        return if (order.nullOrdering == NullsFirst) -1 else 1
       } else if (right == null) {
-        return if (order.direction == Ascending) 1 else -1
+        return if (order.nullOrdering == NullsFirst) 1 else -1
       } else {
         val comparison = order.dataType match {
           case dt: AtomicType if order.direction == Ascending =>
@@ -76,7 +76,7 @@ object InterpretedOrdering {
    */
   def forSchema(dataTypes: Seq[DataType]): InterpretedOrdering = {
     new InterpretedOrdering(dataTypes.zipWithIndex.map {
-      case (dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending)
+      case (dt, index) => SortOrder(BoundReference(index, dt, nullable = true), Ascending)
     })
   }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index bbbb14df88f8cc24c6d9204e205acca6de2f7789..69d68fa6f92efa54f19e6068ad5446d4e8cfa9c8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -1206,11 +1206,19 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
    * Create a [[SortOrder]] expression.
    */
   override def visitSortItem(ctx: SortItemContext): SortOrder = withOrigin(ctx) {
-    if (ctx.DESC != null) {
-      SortOrder(expression(ctx.expression), Descending)
+    val direction = if (ctx.DESC != null) {
+      Descending
     } else {
-      SortOrder(expression(ctx.expression), Ascending)
+      Ascending
     }
+    val nullOrdering = if (ctx.FIRST != null) {
+      NullsFirst
+    } else if (ctx.LAST != null) {
+      NullsLast
+    } else {
+      direction.defaultNullOrdering
+    }
+    SortOrder(expression(ctx.expression), direction, nullOrdering)
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
index 940467e74d597f9cc4c09a4c056d091d18b6fed6..c6665d273fd271b9d3530e9446d8ca1d0f76043d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
@@ -40,22 +40,70 @@ object SortPrefixUtils {
 
   def getPrefixComparator(sortOrder: SortOrder): PrefixComparator = {
     sortOrder.dataType match {
-      case StringType =>
-        if (sortOrder.isAscending) PrefixComparators.STRING else PrefixComparators.STRING_DESC
-      case BinaryType =>
-        if (sortOrder.isAscending) PrefixComparators.BINARY else PrefixComparators.BINARY_DESC
+      case StringType => stringPrefixComparator(sortOrder)
+      case BinaryType => binaryPrefixComparator(sortOrder)
       case BooleanType | ByteType | ShortType | IntegerType | LongType | DateType | TimestampType =>
-        if (sortOrder.isAscending) PrefixComparators.LONG else PrefixComparators.LONG_DESC
+        longPrefixComparator(sortOrder)
       case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS =>
-        if (sortOrder.isAscending) PrefixComparators.LONG else PrefixComparators.LONG_DESC
-      case FloatType | DoubleType =>
-        if (sortOrder.isAscending) PrefixComparators.DOUBLE else PrefixComparators.DOUBLE_DESC
-      case dt: DecimalType =>
-        if (sortOrder.isAscending) PrefixComparators.DOUBLE else PrefixComparators.DOUBLE_DESC
+        longPrefixComparator(sortOrder)
+      case FloatType | DoubleType => doublePrefixComparator(sortOrder)
+      case dt: DecimalType => doublePrefixComparator(sortOrder)
       case _ => NoOpPrefixComparator
     }
   }
 
+  private def stringPrefixComparator(sortOrder: SortOrder): PrefixComparator = {
+    sortOrder.direction match {
+      case Ascending if (sortOrder.nullOrdering == NullsLast) =>
+        PrefixComparators.STRING_NULLS_LAST
+      case Ascending =>
+        PrefixComparators.STRING
+      case Descending if (sortOrder.nullOrdering == NullsFirst) =>
+        PrefixComparators.STRING_DESC_NULLS_FIRST
+      case Descending =>
+        PrefixComparators.STRING_DESC
+    }
+  }
+
+  private def binaryPrefixComparator(sortOrder: SortOrder): PrefixComparator = {
+    sortOrder.direction match {
+      case Ascending if (sortOrder.nullOrdering == NullsLast) =>
+        PrefixComparators.BINARY_NULLS_LAST
+      case Ascending =>
+        PrefixComparators.BINARY
+      case Descending if (sortOrder.nullOrdering == NullsFirst) =>
+        PrefixComparators.BINARY_DESC_NULLS_FIRST
+      case Descending =>
+        PrefixComparators.BINARY_DESC
+    }
+  }
+
+  private def longPrefixComparator(sortOrder: SortOrder): PrefixComparator = {
+    sortOrder.direction match {
+      case Ascending if (sortOrder.nullOrdering == NullsLast) =>
+        PrefixComparators.LONG_NULLS_LAST
+      case Ascending =>
+        PrefixComparators.LONG
+      case Descending if (sortOrder.nullOrdering == NullsFirst) =>
+        PrefixComparators.LONG_DESC_NULLS_FIRST
+      case Descending =>
+        PrefixComparators.LONG_DESC
+    }
+  }
+
+  private def doublePrefixComparator(sortOrder: SortOrder): PrefixComparator = {
+    sortOrder.direction match {
+      case Ascending if (sortOrder.nullOrdering == NullsLast) =>
+        PrefixComparators.DOUBLE_NULLS_LAST
+      case Ascending =>
+        PrefixComparators.DOUBLE
+      case Descending if (sortOrder.nullOrdering == NullsFirst) =>
+        PrefixComparators.DOUBLE_DESC_NULLS_FIRST
+      case Descending =>
+        PrefixComparators.DOUBLE_DESC
+    }
+  }
+
   /**
    * Creates the prefix comparator for the first field in the given schema, in ascending order.
    */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 6a2d97c9b179704c24c36c1269a9ad63a56eedbb..6aeefa6eddafe7a0a40b21b42d8ae672bf239a85 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -368,7 +368,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
    */
   protected def newNaturalAscendingOrdering(dataTypes: Seq[DataType]): Ordering[InternalRow] = {
     val order: Seq[SortOrder] = dataTypes.zipWithIndex.map {
-      case (dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending)
+      case (dt, index) => SortOrder(BoundReference(index, dt, nullable = true), Ascending)
     }
     newOrdering(order, Seq.empty)
   }
diff --git a/sql/core/src/test/resources/sql-tests/inputs/orderby-nulls-ordering.sql b/sql/core/src/test/resources/sql-tests/inputs/orderby-nulls-ordering.sql
new file mode 100644
index 0000000000000000000000000000000000000000..f7637b444b9fe0535143b20bcdfa627d05d71867
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/orderby-nulls-ordering.sql
@@ -0,0 +1,83 @@
+-- Q1. testing window functions with order by
+create table spark_10747(col1 int, col2 int, col3 int) using parquet;
+
+-- Q2. insert to tables
+INSERT INTO spark_10747 VALUES (6, 12, 10), (6, 11, 4), (6, 9, 10), (6, 15, 8),
+(6, 15, 8), (6, 7, 4), (6, 7, 8), (6, 13, null), (6, 10, null);
+
+-- Q3. windowing with order by DESC NULLS LAST
+select col1, col2, col3, sum(col2)
+    over (partition by col1
+       order by col3 desc nulls last, col2
+       rows between 2 preceding and 2 following ) as sum_col2
+from spark_10747 where col1 = 6 order by sum_col2;
+
+-- Q4. windowing with order by DESC NULLS FIRST
+select col1, col2, col3, sum(col2)
+    over (partition by col1
+       order by col3 desc nulls first, col2
+       rows between 2 preceding and 2 following ) as sum_col2
+from spark_10747 where col1 = 6 order by sum_col2;
+
+-- Q5. windowing with order by ASC NULLS LAST
+select col1, col2, col3, sum(col2)
+    over (partition by col1
+       order by col3 asc nulls last, col2
+       rows between 2 preceding and 2 following ) as sum_col2
+from spark_10747 where col1 = 6 order by sum_col2;
+
+-- Q6. windowing with order by ASC NULLS FIRST
+select col1, col2, col3, sum(col2)
+    over (partition by col1
+       order by col3 asc nulls first, col2
+       rows between 2 preceding and 2 following ) as sum_col2
+from spark_10747 where col1 = 6 order by sum_col2;
+
+-- Q7. Regular query with ORDER BY ASC NULLS FIRST
+SELECT COL1, COL2, COL3 FROM spark_10747 ORDER BY COL3 ASC NULLS FIRST, COL2;
+
+-- Q8. Regular query with ORDER BY ASC NULLS LAST
+SELECT COL1, COL2, COL3 FROM spark_10747 ORDER BY COL3 NULLS LAST, COL2;
+
+-- Q9. Regular query with ORDER BY DESC NULLS FIRST
+SELECT COL1, COL2, COL3 FROM spark_10747 ORDER BY COL3 DESC NULLS FIRST, COL2;
+
+-- Q10. Regular query with ORDER BY DESC NULLS LAST
+SELECT COL1, COL2, COL3 FROM spark_10747 ORDER BY COL3 DESC NULLS LAST, COL2;
+
+-- drop the test table
+drop table spark_10747;
+
+-- Q11. mix datatype for ORDER BY NULLS FIRST|LAST
+create table spark_10747_mix(
+col1 string,
+col2 int,
+col3 double,
+col4 decimal(10,2),
+col5 decimal(20,1))
+using parquet;
+
+-- Q12. Insert to the table
+INSERT INTO spark_10747_mix VALUES
+('b', 2, 1.0, 1.00, 10.0),
+('d', 3, 2.0, 3.00, 0.0),
+('c', 3, 2.0, 2.00, 15.1),
+('d', 3, 0.0, 3.00, 1.0),
+(null, 3, 0.0, 3.00, 1.0),
+('d', 3, null, 4.00, 1.0),
+('a', 1, 1.0, 1.00, null),
+('c', 3, 2.0, 2.00, null);
+
+-- Q13. Regular query with 2 NULLS LAST columns
+select * from spark_10747_mix order by col1 nulls last, col5 nulls last;
+
+-- Q14. Regular query with 2 NULLS FIRST columns
+select * from spark_10747_mix order by col1 desc nulls first, col5 desc nulls first;
+
+-- Q15. Regular query with mixed NULLS FIRST|LAST
+select * from spark_10747_mix order by col5 desc nulls first, col3 desc nulls last;
+
+-- drop the test table
+drop table spark_10747_mix;
+
+
diff --git a/sql/core/src/test/resources/sql-tests/results/orderby-nulls-ordering.sql.out b/sql/core/src/test/resources/sql-tests/results/orderby-nulls-ordering.sql.out
new file mode 100644
index 0000000000000000000000000000000000000000..c1b63dfb8caef7e90d8363d4b98633fb44334591
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/orderby-nulls-ordering.sql.out
@@ -0,0 +1,254 @@
+-- Automatically generated by SQLQueryTestSuite
+-- Number of queries: 17
+
+
+-- !query 0
+create table spark_10747(col1 int, col2 int, col3 int) using parquet
+-- !query 0 schema
+struct<>
+-- !query 0 output
+
+
+
+-- !query 1
+INSERT INTO spark_10747 VALUES (6, 12, 10), (6, 11, 4), (6, 9, 10), (6, 15, 8),
+(6, 15, 8), (6, 7, 4), (6, 7, 8), (6, 13, null), (6, 10, null)
+-- !query 1 schema
+struct<>
+-- !query 1 output
+
+
+
+-- !query 2
+select col1, col2, col3, sum(col2)
+    over (partition by col1
+       order by col3 desc nulls last, col2
+       rows between 2 preceding and 2 following ) as sum_col2
+from spark_10747 where col1 = 6 order by sum_col2
+-- !query 2 schema
+struct<col1:int,col2:int,col3:int,sum_col2:bigint>
+-- !query 2 output
+6	9	10	28
+6	13	NULL	34
+6	10	NULL	41
+6	12	10	43
+6	15	8	55
+6	15	8	56
+6	11	4	56
+6	7	8	58
+6	7	4	58
+
+
+-- !query 3
+select col1, col2, col3, sum(col2)
+    over (partition by col1
+       order by col3 desc nulls first, col2
+       rows between 2 preceding and 2 following ) as sum_col2
+from spark_10747 where col1 = 6 order by sum_col2
+-- !query 3 schema
+struct<col1:int,col2:int,col3:int,sum_col2:bigint>
+-- !query 3 output
+6	10	NULL	32
+6	11	4	33
+6	13	NULL	44
+6	7	4	48
+6	9	10	51
+6	15	8	55
+6	12	10	56
+6	15	8	56
+6	7	8	58
+
+
+-- !query 4
+select col1, col2, col3, sum(col2)
+    over (partition by col1
+       order by col3 asc nulls last, col2
+       rows between 2 preceding and 2 following ) as sum_col2
+from spark_10747 where col1 = 6 order by sum_col2
+-- !query 4 schema
+struct<col1:int,col2:int,col3:int,sum_col2:bigint>
+-- !query 4 output
+6	7	4	25
+6	13	NULL	35
+6	11	4	40
+6	10	NULL	44
+6	7	8	55
+6	15	8	57
+6	15	8	58
+6	12	10	59
+6	9	10	61
+
+
+-- !query 5
+select col1, col2, col3, sum(col2)
+    over (partition by col1
+       order by col3 asc nulls first, col2
+       rows between 2 preceding and 2 following ) as sum_col2
+from spark_10747 where col1 = 6 order by sum_col2
+-- !query 5 schema
+struct<col1:int,col2:int,col3:int,sum_col2:bigint>
+-- !query 5 output
+6	10	NULL	30
+6	12	10	36
+6	13	NULL	41
+6	7	4	48
+6	9	10	51
+6	11	4	53
+6	7	8	55
+6	15	8	57
+6	15	8	58
+
+
+-- !query 6
+SELECT COL1, COL2, COL3 FROM spark_10747 ORDER BY COL3 ASC NULLS FIRST, COL2
+-- !query 6 schema
+struct<COL1:int,COL2:int,COL3:int>
+-- !query 6 output
+6	10	NULL
+6	13	NULL
+6	7	4
+6	11	4
+6	7	8
+6	15	8
+6	15	8
+6	9	10
+6	12	10
+
+
+-- !query 7
+SELECT COL1, COL2, COL3 FROM spark_10747 ORDER BY COL3 NULLS LAST, COL2
+-- !query 7 schema
+struct<COL1:int,COL2:int,COL3:int>
+-- !query 7 output
+6	7	4
+6	11	4
+6	7	8
+6	15	8
+6	15	8
+6	9	10
+6	12	10
+6	10	NULL
+6	13	NULL
+
+
+-- !query 8
+SELECT COL1, COL2, COL3 FROM spark_10747 ORDER BY COL3 DESC NULLS FIRST, COL2
+-- !query 8 schema
+struct<COL1:int,COL2:int,COL3:int>
+-- !query 8 output
+6	10	NULL
+6	13	NULL
+6	9	10
+6	12	10
+6	7	8
+6	15	8
+6	15	8
+6	7	4
+6	11	4
+
+
+-- !query 9
+SELECT COL1, COL2, COL3 FROM spark_10747 ORDER BY COL3 DESC NULLS LAST, COL2
+-- !query 9 schema
+struct<COL1:int,COL2:int,COL3:int>
+-- !query 9 output
+6	9	10
+6	12	10
+6	7	8
+6	15	8
+6	15	8
+6	7	4
+6	11	4
+6	10	NULL
+6	13	NULL
+
+
+-- !query 10
+drop table spark_10747
+-- !query 10 schema
+struct<>
+-- !query 10 output
+
+
+
+-- !query 11
+create table spark_10747_mix(
+col1 string,
+col2 int,
+col3 double,
+col4 decimal(10,2),
+col5 decimal(20,1))
+using parquet
+-- !query 11 schema
+struct<>
+-- !query 11 output
+
+
+
+-- !query 12
+INSERT INTO spark_10747_mix VALUES
+('b', 2, 1.0, 1.00, 10.0),
+('d', 3, 2.0, 3.00, 0.0),
+('c', 3, 2.0, 2.00, 15.1),
+('d', 3, 0.0, 3.00, 1.0),
+(null, 3, 0.0, 3.00, 1.0),
+('d', 3, null, 4.00, 1.0),
+('a', 1, 1.0, 1.00, null),
+('c', 3, 2.0, 2.00, null)
+-- !query 12 schema
+struct<>
+-- !query 12 output
+
+
+
+-- !query 13
+select * from spark_10747_mix order by col1 nulls last, col5 nulls last
+-- !query 13 schema
+struct<col1:string,col2:int,col3:double,col4:decimal(10,2),col5:decimal(20,1)>
+-- !query 13 output
+a	1	1.0	1	NULL
+b	2	1.0	1	10
+c	3	2.0	2	15.1
+c	3	2.0	2	NULL
+d	3	2.0	3	0
+d	3	0.0	3	1
+d	3	NULL	4	1
+NULL	3	0.0	3	1
+
+
+-- !query 14
+select * from spark_10747_mix order by col1 desc nulls first, col5 desc nulls first
+-- !query 14 schema
+struct<col1:string,col2:int,col3:double,col4:decimal(10,2),col5:decimal(20,1)>
+-- !query 14 output
+NULL	3	0.0	3	1
+d	3	0.0	3	1
+d	3	NULL	4	1
+d	3	2.0	3	0
+c	3	2.0	2	NULL
+c	3	2.0	2	15.1
+b	2	1.0	1	10
+a	1	1.0	1	NULL
+
+
+-- !query 15
+select * from spark_10747_mix order by col5 desc nulls first, col3 desc nulls last
+-- !query 15 schema
+struct<col1:string,col2:int,col3:double,col4:decimal(10,2),col5:decimal(20,1)>
+-- !query 15 output
+c	3	2.0	2	NULL
+a	1	1.0	1	NULL
+c	3	2.0	2	15.1
+b	2	1.0	1	10
+d	3	0.0	3	1
+NULL	3	0.0	3	1
+d	3	NULL	4	1
+d	3	2.0	3	0
+
+
+-- !query 16
+drop table spark_10747_mix
+-- !query 16 schema
+struct<>
+-- !query 16 output
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
index ba3fa3732d0df3a1912f33a8291d41ad5ba84008..a7bbe34f4eedb8aa7309638a976aad654aa00ac5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
@@ -101,7 +101,8 @@ class SortSuite extends SparkPlanTest with SharedSQLContext {
   for (
     dataType <- DataTypeTestUtils.atomicTypes ++ Set(NullType);
     nullable <- Seq(true, false);
-    sortOrder <- Seq('a.asc :: Nil, 'a.desc :: Nil);
+    sortOrder <-
+      Seq('a.asc :: Nil, 'a.asc_nullsLast :: Nil, 'a.desc :: Nil, 'a.desc_nullsFirst :: Nil);
     randomDataGenerator <- RandomDataGenerator.forType(dataType, nullable)
   ) {
     test(s"sorting on $dataType with nullable=$nullable, sortOrder=$sortOrder") {
diff --git a/sql/hive/src/test/resources/sqlgen/agg2.sql b/sql/hive/src/test/resources/sqlgen/agg2.sql
index 65d71714fe850f26b04a2ed5fe74298a248a0db0..adbfdb7e79d64678a3ef08bb339ae8e34c55076a 100644
--- a/sql/hive/src/test/resources/sqlgen/agg2.sql
+++ b/sql/hive/src/test/resources/sqlgen/agg2.sql
@@ -1,4 +1,4 @@
 -- This file is automatically generated by LogicalPlanToSQLSuite.
 SELECT COUNT(value) FROM parquet_t1 GROUP BY key ORDER BY MAX(key)
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `count(value)` FROM (SELECT `gen_attr_0` FROM (SELECT count(`gen_attr_3`) AS `gen_attr_0`, max(`gen_attr_2`) AS `gen_attr_1` FROM (SELECT `key` AS `gen_attr_2`, `value` AS `gen_attr_3` FROM `default`.`parquet_t1`) AS gen_subquery_0 GROUP BY `gen_attr_2` ORDER BY `gen_attr_1` ASC) AS gen_subquery_1) AS gen_subquery_2
+SELECT `gen_attr_0` AS `count(value)` FROM (SELECT `gen_attr_0` FROM (SELECT count(`gen_attr_3`) AS `gen_attr_0`, max(`gen_attr_2`) AS `gen_attr_1` FROM (SELECT `key` AS `gen_attr_2`, `value` AS `gen_attr_3` FROM `default`.`parquet_t1`) AS gen_subquery_0 GROUP BY `gen_attr_2` ORDER BY `gen_attr_1` ASC NULLS FIRST) AS gen_subquery_1) AS gen_subquery_2
diff --git a/sql/hive/src/test/resources/sqlgen/agg3.sql b/sql/hive/src/test/resources/sqlgen/agg3.sql
index 14b19392cdce3dc5b45b87126ab018a57690c7b3..207542d226e23d521dfa2748c6db736b73acddd8 100644
--- a/sql/hive/src/test/resources/sqlgen/agg3.sql
+++ b/sql/hive/src/test/resources/sqlgen/agg3.sql
@@ -1,4 +1,4 @@
 -- This file is automatically generated by LogicalPlanToSQLSuite.
 SELECT COUNT(value) FROM parquet_t1 GROUP BY key ORDER BY key, MAX(key)
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `count(value)` FROM (SELECT `gen_attr_0` FROM (SELECT count(`gen_attr_4`) AS `gen_attr_0`, `gen_attr_3` AS `gen_attr_1`, max(`gen_attr_3`) AS `gen_attr_2` FROM (SELECT `key` AS `gen_attr_3`, `value` AS `gen_attr_4` FROM `default`.`parquet_t1`) AS gen_subquery_0 GROUP BY `gen_attr_3` ORDER BY `gen_attr_1` ASC, `gen_attr_2` ASC) AS gen_subquery_1) AS gen_subquery_2
+SELECT `gen_attr_0` AS `count(value)` FROM (SELECT `gen_attr_0` FROM (SELECT count(`gen_attr_4`) AS `gen_attr_0`, `gen_attr_3` AS `gen_attr_1`, max(`gen_attr_3`) AS `gen_attr_2` FROM (SELECT `key` AS `gen_attr_3`, `value` AS `gen_attr_4` FROM `default`.`parquet_t1`) AS gen_subquery_0 GROUP BY `gen_attr_3` ORDER BY `gen_attr_1` ASC NULLS FIRST, `gen_attr_2` ASC NULLS FIRST) AS gen_subquery_1) AS gen_subquery_2
diff --git a/sql/hive/src/test/resources/sqlgen/broadcast_join_subquery.sql b/sql/hive/src/test/resources/sqlgen/broadcast_join_subquery.sql
index ec881a216e0b038b5134e36d98f79149602b9370..3de4f8a05996597cadc614b037b1a0ddcfb03279 100644
--- a/sql/hive/src/test/resources/sqlgen/broadcast_join_subquery.sql
+++ b/sql/hive/src/test/resources/sqlgen/broadcast_join_subquery.sql
@@ -5,4 +5,4 @@ FROM (SELECT x.key as key1, x.value as value1, y.key as key2, y.value as value2
 JOIN srcpart z ON (subq.key1 = z.key and z.ds='2008-04-08' and z.hr=11)
 ORDER BY subq.key1, z.value
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `key1`, `gen_attr_1` AS `value` FROM (SELECT `gen_attr_0`, `gen_attr_1` FROM (SELECT `gen_attr_5` AS `gen_attr_0`, `gen_attr_7` AS `gen_attr_6`, `gen_attr_9` AS `gen_attr_8`, `gen_attr_11` AS `gen_attr_10` FROM (SELECT `key` AS `gen_attr_5`, `value` AS `gen_attr_7` FROM `default`.`src1`) AS gen_subquery_0 INNER JOIN (SELECT `key` AS `gen_attr_9`, `value` AS `gen_attr_11` FROM `default`.`src`) AS gen_subquery_1 ON (`gen_attr_5` = `gen_attr_9`)) AS subq INNER JOIN (SELECT `key` AS `gen_attr_2`, `value` AS `gen_attr_1`, `ds` AS `gen_attr_3`, `hr` AS `gen_attr_4` FROM `default`.`srcpart`) AS gen_subquery_2 ON (((`gen_attr_0` = `gen_attr_2`) AND (`gen_attr_3` = '2008-04-08')) AND (CAST(`gen_attr_4` AS DOUBLE) = CAST(11 AS DOUBLE))) ORDER BY `gen_attr_0` ASC, `gen_attr_1` ASC) AS gen_subquery_3
+SELECT `gen_attr_0` AS `key1`, `gen_attr_1` AS `value` FROM (SELECT `gen_attr_0`, `gen_attr_1` FROM (SELECT `gen_attr_5` AS `gen_attr_0`, `gen_attr_7` AS `gen_attr_6`, `gen_attr_9` AS `gen_attr_8`, `gen_attr_11` AS `gen_attr_10` FROM (SELECT `key` AS `gen_attr_5`, `value` AS `gen_attr_7` FROM `default`.`src1`) AS gen_subquery_0 INNER JOIN (SELECT `key` AS `gen_attr_9`, `value` AS `gen_attr_11` FROM `default`.`src`) AS gen_subquery_1 ON (`gen_attr_5` = `gen_attr_9`)) AS subq INNER JOIN (SELECT `key` AS `gen_attr_2`, `value` AS `gen_attr_1`, `ds` AS `gen_attr_3`, `hr` AS `gen_attr_4` FROM `default`.`srcpart`) AS gen_subquery_2 ON (((`gen_attr_0` = `gen_attr_2`) AND (`gen_attr_3` = '2008-04-08')) AND (CAST(`gen_attr_4` AS DOUBLE) = CAST(11 AS DOUBLE))) ORDER BY `gen_attr_0` ASC NULLS FIRST, `gen_attr_1` ASC NULLS FIRST) AS gen_subquery_3
diff --git a/sql/hive/src/test/resources/sqlgen/generate_with_other_1.sql b/sql/hive/src/test/resources/sqlgen/generate_with_other_1.sql
index 805197a4ea11be5db2fcf4dbabc72623a45e4aa2..ab444d0c7093653a8a2fd729578fc04374610cd1 100644
--- a/sql/hive/src/test/resources/sqlgen/generate_with_other_1.sql
+++ b/sql/hive/src/test/resources/sqlgen/generate_with_other_1.sql
@@ -5,4 +5,4 @@ WHERE id > 2
 ORDER BY val, id
 LIMIT 5
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `val`, `gen_attr_1` AS `id` FROM (SELECT `gen_attr_0`, `gen_attr_1` FROM (SELECT gen_subquery_0.`gen_attr_2`, gen_subquery_0.`gen_attr_3`, gen_subquery_0.`gen_attr_4`, gen_subquery_0.`gen_attr_1` FROM (SELECT `arr` AS `gen_attr_2`, `arr2` AS `gen_attr_3`, `json` AS `gen_attr_4`, `id` AS `gen_attr_1` FROM `default`.`parquet_t3`) AS gen_subquery_0 WHERE (`gen_attr_1` > CAST(2 AS BIGINT))) AS gen_subquery_1 LATERAL VIEW explode(`gen_attr_2`) gen_subquery_2 AS `gen_attr_0` ORDER BY `gen_attr_0` ASC, `gen_attr_1` ASC LIMIT 5) AS parquet_t3
+SELECT `gen_attr_0` AS `val`, `gen_attr_1` AS `id` FROM (SELECT `gen_attr_0`, `gen_attr_1` FROM (SELECT gen_subquery_0.`gen_attr_2`, gen_subquery_0.`gen_attr_3`, gen_subquery_0.`gen_attr_4`, gen_subquery_0.`gen_attr_1` FROM (SELECT `arr` AS `gen_attr_2`, `arr2` AS `gen_attr_3`, `json` AS `gen_attr_4`, `id` AS `gen_attr_1` FROM `default`.`parquet_t3`) AS gen_subquery_0 WHERE (`gen_attr_1` > CAST(2 AS BIGINT))) AS gen_subquery_1 LATERAL VIEW explode(`gen_attr_2`) gen_subquery_2 AS `gen_attr_0` ORDER BY `gen_attr_0` ASC NULLS FIRST, `gen_attr_1` ASC NULLS FIRST LIMIT 5) AS parquet_t3
diff --git a/sql/hive/src/test/resources/sqlgen/generate_with_other_2.sql b/sql/hive/src/test/resources/sqlgen/generate_with_other_2.sql
index ef9a596197b8b49a9cd01e22626f0f79cb6a0b01..42a2369f34d1ce7efc992dc429d62ecd7e4b23a2 100644
--- a/sql/hive/src/test/resources/sqlgen/generate_with_other_2.sql
+++ b/sql/hive/src/test/resources/sqlgen/generate_with_other_2.sql
@@ -7,4 +7,4 @@ WHERE val > 2
 ORDER BY val, id
 LIMIT 5
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `val`, `gen_attr_1` AS `id` FROM (SELECT `gen_attr_0`, `gen_attr_1` FROM (SELECT `arr` AS `gen_attr_4`, `arr2` AS `gen_attr_3`, `json` AS `gen_attr_5`, `id` AS `gen_attr_1` FROM `default`.`parquet_t3`) AS gen_subquery_0 LATERAL VIEW explode(`gen_attr_3`) gen_subquery_2 AS `gen_attr_2` LATERAL VIEW explode(`gen_attr_2`) gen_subquery_3 AS `gen_attr_0` WHERE (`gen_attr_0` > CAST(2 AS BIGINT)) ORDER BY `gen_attr_0` ASC, `gen_attr_1` ASC LIMIT 5) AS gen_subquery_1
+SELECT `gen_attr_0` AS `val`, `gen_attr_1` AS `id` FROM (SELECT `gen_attr_0`, `gen_attr_1` FROM (SELECT `arr` AS `gen_attr_4`, `arr2` AS `gen_attr_3`, `json` AS `gen_attr_5`, `id` AS `gen_attr_1` FROM `default`.`parquet_t3`) AS gen_subquery_0 LATERAL VIEW explode(`gen_attr_3`) gen_subquery_2 AS `gen_attr_2` LATERAL VIEW explode(`gen_attr_2`) gen_subquery_3 AS `gen_attr_0` WHERE (`gen_attr_0` > CAST(2 AS BIGINT)) ORDER BY `gen_attr_0` ASC NULLS FIRST, `gen_attr_1` ASC NULLS FIRST LIMIT 5) AS gen_subquery_1
diff --git a/sql/hive/src/test/resources/sqlgen/grouping_sets_2_1.sql b/sql/hive/src/test/resources/sqlgen/grouping_sets_2_1.sql
index b2c426c660d806d63e9f87d2413fc79bcca9db15..245b52341658fd39dedec4623abff234a1593abb 100644
--- a/sql/hive/src/test/resources/sqlgen/grouping_sets_2_1.sql
+++ b/sql/hive/src/test/resources/sqlgen/grouping_sets_2_1.sql
@@ -1,4 +1,4 @@
 -- This file is automatically generated by LogicalPlanToSQLSuite.
 SELECT a, b, sum(c) FROM parquet_t2 GROUP BY a, b GROUPING SETS (a, b) ORDER BY a, b
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `a`, `gen_attr_1` AS `b`, `gen_attr_3` AS `sum(c)` FROM (SELECT `gen_attr_5` AS `gen_attr_0`, `gen_attr_6` AS `gen_attr_1`, sum(`gen_attr_4`) AS `gen_attr_3` FROM (SELECT `a` AS `gen_attr_5`, `b` AS `gen_attr_6`, `c` AS `gen_attr_4`, `d` AS `gen_attr_7` FROM `default`.`parquet_t2`) AS gen_subquery_0 GROUP BY `gen_attr_5`, `gen_attr_6` GROUPING SETS((`gen_attr_5`), (`gen_attr_6`)) ORDER BY `gen_attr_0` ASC, `gen_attr_1` ASC) AS gen_subquery_1
+SELECT `gen_attr_0` AS `a`, `gen_attr_1` AS `b`, `gen_attr_3` AS `sum(c)` FROM (SELECT `gen_attr_5` AS `gen_attr_0`, `gen_attr_6` AS `gen_attr_1`, sum(`gen_attr_4`) AS `gen_attr_3` FROM (SELECT `a` AS `gen_attr_5`, `b` AS `gen_attr_6`, `c` AS `gen_attr_4`, `d` AS `gen_attr_7` FROM `default`.`parquet_t2`) AS gen_subquery_0 GROUP BY `gen_attr_5`, `gen_attr_6` GROUPING SETS((`gen_attr_5`), (`gen_attr_6`)) ORDER BY `gen_attr_0` ASC NULLS FIRST, `gen_attr_1` ASC NULLS FIRST) AS gen_subquery_1
diff --git a/sql/hive/src/test/resources/sqlgen/grouping_sets_2_2.sql b/sql/hive/src/test/resources/sqlgen/grouping_sets_2_2.sql
index 96ee8e85951e8b8d19796a5b615d59d2afd4b830..1505dea11ec68848ad3682ef11205201c4887006 100644
--- a/sql/hive/src/test/resources/sqlgen/grouping_sets_2_2.sql
+++ b/sql/hive/src/test/resources/sqlgen/grouping_sets_2_2.sql
@@ -1,4 +1,4 @@
 -- This file is automatically generated by LogicalPlanToSQLSuite.
 SELECT a, b, sum(c) FROM parquet_t2 GROUP BY a, b GROUPING SETS (a) ORDER BY a, b
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `a`, `gen_attr_1` AS `b`, `gen_attr_3` AS `sum(c)` FROM (SELECT `gen_attr_5` AS `gen_attr_0`, `gen_attr_6` AS `gen_attr_1`, sum(`gen_attr_4`) AS `gen_attr_3` FROM (SELECT `a` AS `gen_attr_5`, `b` AS `gen_attr_6`, `c` AS `gen_attr_4`, `d` AS `gen_attr_7` FROM `default`.`parquet_t2`) AS gen_subquery_0 GROUP BY `gen_attr_5`, `gen_attr_6` GROUPING SETS((`gen_attr_5`)) ORDER BY `gen_attr_0` ASC, `gen_attr_1` ASC) AS gen_subquery_1
+SELECT `gen_attr_0` AS `a`, `gen_attr_1` AS `b`, `gen_attr_3` AS `sum(c)` FROM (SELECT `gen_attr_5` AS `gen_attr_0`, `gen_attr_6` AS `gen_attr_1`, sum(`gen_attr_4`) AS `gen_attr_3` FROM (SELECT `a` AS `gen_attr_5`, `b` AS `gen_attr_6`, `c` AS `gen_attr_4`, `d` AS `gen_attr_7` FROM `default`.`parquet_t2`) AS gen_subquery_0 GROUP BY `gen_attr_5`, `gen_attr_6` GROUPING SETS((`gen_attr_5`)) ORDER BY `gen_attr_0` ASC NULLS FIRST, `gen_attr_1` ASC NULLS FIRST) AS gen_subquery_1
diff --git a/sql/hive/src/test/resources/sqlgen/grouping_sets_2_3.sql b/sql/hive/src/test/resources/sqlgen/grouping_sets_2_3.sql
index 9b8b230c879c230dfd2930e32a7f7d883d75b05f..281add6aabb642a9792c9f2c86321a7042ed33d0 100644
--- a/sql/hive/src/test/resources/sqlgen/grouping_sets_2_3.sql
+++ b/sql/hive/src/test/resources/sqlgen/grouping_sets_2_3.sql
@@ -1,4 +1,4 @@
 -- This file is automatically generated by LogicalPlanToSQLSuite.
 SELECT a, b, sum(c) FROM parquet_t2 GROUP BY a, b GROUPING SETS (b) ORDER BY a, b
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `a`, `gen_attr_1` AS `b`, `gen_attr_3` AS `sum(c)` FROM (SELECT `gen_attr_5` AS `gen_attr_0`, `gen_attr_6` AS `gen_attr_1`, sum(`gen_attr_4`) AS `gen_attr_3` FROM (SELECT `a` AS `gen_attr_5`, `b` AS `gen_attr_6`, `c` AS `gen_attr_4`, `d` AS `gen_attr_7` FROM `default`.`parquet_t2`) AS gen_subquery_0 GROUP BY `gen_attr_5`, `gen_attr_6` GROUPING SETS((`gen_attr_6`)) ORDER BY `gen_attr_0` ASC, `gen_attr_1` ASC) AS gen_subquery_1
+SELECT `gen_attr_0` AS `a`, `gen_attr_1` AS `b`, `gen_attr_3` AS `sum(c)` FROM (SELECT `gen_attr_5` AS `gen_attr_0`, `gen_attr_6` AS `gen_attr_1`, sum(`gen_attr_4`) AS `gen_attr_3` FROM (SELECT `a` AS `gen_attr_5`, `b` AS `gen_attr_6`, `c` AS `gen_attr_4`, `d` AS `gen_attr_7` FROM `default`.`parquet_t2`) AS gen_subquery_0 GROUP BY `gen_attr_5`, `gen_attr_6` GROUPING SETS((`gen_attr_6`)) ORDER BY `gen_attr_0` ASC NULLS FIRST, `gen_attr_1` ASC NULLS FIRST) AS gen_subquery_1
diff --git a/sql/hive/src/test/resources/sqlgen/grouping_sets_2_4.sql b/sql/hive/src/test/resources/sqlgen/grouping_sets_2_4.sql
index c35db74a5c5b59940bf5d2577a47354039976324..f8d64742b11e35bf1d4b7f83f81b909940c6bb09 100644
--- a/sql/hive/src/test/resources/sqlgen/grouping_sets_2_4.sql
+++ b/sql/hive/src/test/resources/sqlgen/grouping_sets_2_4.sql
@@ -1,4 +1,4 @@
 -- This file is automatically generated by LogicalPlanToSQLSuite.
 SELECT a, b, sum(c) FROM parquet_t2 GROUP BY a, b GROUPING SETS (()) ORDER BY a, b
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `a`, `gen_attr_1` AS `b`, `gen_attr_3` AS `sum(c)` FROM (SELECT `gen_attr_5` AS `gen_attr_0`, `gen_attr_6` AS `gen_attr_1`, sum(`gen_attr_4`) AS `gen_attr_3` FROM (SELECT `a` AS `gen_attr_5`, `b` AS `gen_attr_6`, `c` AS `gen_attr_4`, `d` AS `gen_attr_7` FROM `default`.`parquet_t2`) AS gen_subquery_0 GROUP BY `gen_attr_5`, `gen_attr_6` GROUPING SETS(()) ORDER BY `gen_attr_0` ASC, `gen_attr_1` ASC) AS gen_subquery_1
+SELECT `gen_attr_0` AS `a`, `gen_attr_1` AS `b`, `gen_attr_3` AS `sum(c)` FROM (SELECT `gen_attr_5` AS `gen_attr_0`, `gen_attr_6` AS `gen_attr_1`, sum(`gen_attr_4`) AS `gen_attr_3` FROM (SELECT `a` AS `gen_attr_5`, `b` AS `gen_attr_6`, `c` AS `gen_attr_4`, `d` AS `gen_attr_7` FROM `default`.`parquet_t2`) AS gen_subquery_0 GROUP BY `gen_attr_5`, `gen_attr_6` GROUPING SETS(()) ORDER BY `gen_attr_0` ASC NULLS FIRST, `gen_attr_1` ASC NULLS FIRST) AS gen_subquery_1
diff --git a/sql/hive/src/test/resources/sqlgen/grouping_sets_2_5.sql b/sql/hive/src/test/resources/sqlgen/grouping_sets_2_5.sql
index e47f6d5dcf4654a5a899447e4327655d3d09a9f6..09e6ec2a5f8c9d36e3f81ea6c1dd50d0cb927710 100644
--- a/sql/hive/src/test/resources/sqlgen/grouping_sets_2_5.sql
+++ b/sql/hive/src/test/resources/sqlgen/grouping_sets_2_5.sql
@@ -2,4 +2,4 @@
 SELECT a, b, sum(c) FROM parquet_t2 GROUP BY a, b
 GROUPING SETS ((), (a), (a, b)) ORDER BY a, b
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `a`, `gen_attr_1` AS `b`, `gen_attr_3` AS `sum(c)` FROM (SELECT `gen_attr_5` AS `gen_attr_0`, `gen_attr_6` AS `gen_attr_1`, sum(`gen_attr_4`) AS `gen_attr_3` FROM (SELECT `a` AS `gen_attr_5`, `b` AS `gen_attr_6`, `c` AS `gen_attr_4`, `d` AS `gen_attr_7` FROM `default`.`parquet_t2`) AS gen_subquery_0 GROUP BY `gen_attr_5`, `gen_attr_6` GROUPING SETS((), (`gen_attr_5`), (`gen_attr_5`, `gen_attr_6`)) ORDER BY `gen_attr_0` ASC, `gen_attr_1` ASC) AS gen_subquery_1
+SELECT `gen_attr_0` AS `a`, `gen_attr_1` AS `b`, `gen_attr_3` AS `sum(c)` FROM (SELECT `gen_attr_5` AS `gen_attr_0`, `gen_attr_6` AS `gen_attr_1`, sum(`gen_attr_4`) AS `gen_attr_3` FROM (SELECT `a` AS `gen_attr_5`, `b` AS `gen_attr_6`, `c` AS `gen_attr_4`, `d` AS `gen_attr_7` FROM `default`.`parquet_t2`) AS gen_subquery_0 GROUP BY `gen_attr_5`, `gen_attr_6` GROUPING SETS((), (`gen_attr_5`), (`gen_attr_5`, `gen_attr_6`)) ORDER BY `gen_attr_0` ASC NULLS FIRST, `gen_attr_1` ASC NULLS FIRST) AS gen_subquery_1
diff --git a/sql/hive/src/test/resources/sqlgen/rollup_cube_6_1.sql b/sql/hive/src/test/resources/sqlgen/rollup_cube_6_1.sql
index 22df578518ef3a14b1ecefdc606326161d823000..c364c32dd5c559c473a6809efe3c7ba6b139ed38 100644
--- a/sql/hive/src/test/resources/sqlgen/rollup_cube_6_1.sql
+++ b/sql/hive/src/test/resources/sqlgen/rollup_cube_6_1.sql
@@ -1,4 +1,4 @@
 -- This file is automatically generated by LogicalPlanToSQLSuite.
 SELECT a, b, sum(c) FROM parquet_t2 GROUP BY ROLLUP(a, b) ORDER BY a, b
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `a`, `gen_attr_1` AS `b`, `gen_attr_3` AS `sum(c)` FROM (SELECT `gen_attr_5` AS `gen_attr_0`, `gen_attr_6` AS `gen_attr_1`, sum(`gen_attr_4`) AS `gen_attr_3` FROM (SELECT `a` AS `gen_attr_5`, `b` AS `gen_attr_6`, `c` AS `gen_attr_4`, `d` AS `gen_attr_7` FROM `default`.`parquet_t2`) AS gen_subquery_0 GROUP BY `gen_attr_5`, `gen_attr_6` GROUPING SETS((`gen_attr_5`, `gen_attr_6`), (`gen_attr_5`), ()) ORDER BY `gen_attr_0` ASC, `gen_attr_1` ASC) AS gen_subquery_1
+SELECT `gen_attr_0` AS `a`, `gen_attr_1` AS `b`, `gen_attr_3` AS `sum(c)` FROM (SELECT `gen_attr_5` AS `gen_attr_0`, `gen_attr_6` AS `gen_attr_1`, sum(`gen_attr_4`) AS `gen_attr_3` FROM (SELECT `a` AS `gen_attr_5`, `b` AS `gen_attr_6`, `c` AS `gen_attr_4`, `d` AS `gen_attr_7` FROM `default`.`parquet_t2`) AS gen_subquery_0 GROUP BY `gen_attr_5`, `gen_attr_6` GROUPING SETS((`gen_attr_5`, `gen_attr_6`), (`gen_attr_5`), ()) ORDER BY `gen_attr_0` ASC NULLS FIRST, `gen_attr_1` ASC NULLS FIRST) AS gen_subquery_1
diff --git a/sql/hive/src/test/resources/sqlgen/rollup_cube_6_2.sql b/sql/hive/src/test/resources/sqlgen/rollup_cube_6_2.sql
index f44b652343acb47c07d5ea82fecc23503fb806b2..36c0223fcecedbc90e1b5831ec6ce1e536720a73 100644
--- a/sql/hive/src/test/resources/sqlgen/rollup_cube_6_2.sql
+++ b/sql/hive/src/test/resources/sqlgen/rollup_cube_6_2.sql
@@ -1,4 +1,4 @@
 -- This file is automatically generated by LogicalPlanToSQLSuite.
 SELECT a, b, sum(c) FROM parquet_t2 GROUP BY CUBE(a, b) ORDER BY a, b
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `a`, `gen_attr_1` AS `b`, `gen_attr_3` AS `sum(c)` FROM (SELECT `gen_attr_5` AS `gen_attr_0`, `gen_attr_6` AS `gen_attr_1`, sum(`gen_attr_4`) AS `gen_attr_3` FROM (SELECT `a` AS `gen_attr_5`, `b` AS `gen_attr_6`, `c` AS `gen_attr_4`, `d` AS `gen_attr_7` FROM `default`.`parquet_t2`) AS gen_subquery_0 GROUP BY `gen_attr_5`, `gen_attr_6` GROUPING SETS((`gen_attr_5`, `gen_attr_6`), (`gen_attr_5`), (`gen_attr_6`), ()) ORDER BY `gen_attr_0` ASC, `gen_attr_1` ASC) AS gen_subquery_1
+SELECT `gen_attr_0` AS `a`, `gen_attr_1` AS `b`, `gen_attr_3` AS `sum(c)` FROM (SELECT `gen_attr_5` AS `gen_attr_0`, `gen_attr_6` AS `gen_attr_1`, sum(`gen_attr_4`) AS `gen_attr_3` FROM (SELECT `a` AS `gen_attr_5`, `b` AS `gen_attr_6`, `c` AS `gen_attr_4`, `d` AS `gen_attr_7` FROM `default`.`parquet_t2`) AS gen_subquery_0 GROUP BY `gen_attr_5`, `gen_attr_6` GROUPING SETS((`gen_attr_5`, `gen_attr_6`), (`gen_attr_5`), (`gen_attr_6`), ()) ORDER BY `gen_attr_0` ASC NULLS FIRST, `gen_attr_1` ASC NULLS FIRST) AS gen_subquery_1
diff --git a/sql/hive/src/test/resources/sqlgen/rollup_cube_6_3.sql b/sql/hive/src/test/resources/sqlgen/rollup_cube_6_3.sql
index 40f6924913765467a92c4753b2b3b0bd451bb82a..ed33f2a1de3cf905506443f9ef65b43f372b2a3f 100644
--- a/sql/hive/src/test/resources/sqlgen/rollup_cube_6_3.sql
+++ b/sql/hive/src/test/resources/sqlgen/rollup_cube_6_3.sql
@@ -1,4 +1,4 @@
 -- This file is automatically generated by LogicalPlanToSQLSuite.
 SELECT a, b, sum(a) FROM parquet_t2 GROUP BY ROLLUP(a, b) ORDER BY a, b
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `a`, `gen_attr_1` AS `b`, `gen_attr_3` AS `sum(a)` FROM (SELECT `gen_attr_4` AS `gen_attr_0`, `gen_attr_5` AS `gen_attr_1`, sum(`gen_attr_4`) AS `gen_attr_3` FROM (SELECT `a` AS `gen_attr_4`, `b` AS `gen_attr_5`, `c` AS `gen_attr_6`, `d` AS `gen_attr_7` FROM `default`.`parquet_t2`) AS gen_subquery_0 GROUP BY `gen_attr_4`, `gen_attr_5` GROUPING SETS((`gen_attr_4`, `gen_attr_5`), (`gen_attr_4`), ()) ORDER BY `gen_attr_0` ASC, `gen_attr_1` ASC) AS gen_subquery_1
+SELECT `gen_attr_0` AS `a`, `gen_attr_1` AS `b`, `gen_attr_3` AS `sum(a)` FROM (SELECT `gen_attr_4` AS `gen_attr_0`, `gen_attr_5` AS `gen_attr_1`, sum(`gen_attr_4`) AS `gen_attr_3` FROM (SELECT `a` AS `gen_attr_4`, `b` AS `gen_attr_5`, `c` AS `gen_attr_6`, `d` AS `gen_attr_7` FROM `default`.`parquet_t2`) AS gen_subquery_0 GROUP BY `gen_attr_4`, `gen_attr_5` GROUPING SETS((`gen_attr_4`, `gen_attr_5`), (`gen_attr_4`), ()) ORDER BY `gen_attr_0` ASC NULLS FIRST, `gen_attr_1` ASC NULLS FIRST) AS gen_subquery_1
diff --git a/sql/hive/src/test/resources/sqlgen/rollup_cube_6_4.sql b/sql/hive/src/test/resources/sqlgen/rollup_cube_6_4.sql
index 608e644dee6d093493b18cc9541a69fd00918897..e0e40241480dacd92fe19052340e651af742fb6e 100644
--- a/sql/hive/src/test/resources/sqlgen/rollup_cube_6_4.sql
+++ b/sql/hive/src/test/resources/sqlgen/rollup_cube_6_4.sql
@@ -1,4 +1,4 @@
 -- This file is automatically generated by LogicalPlanToSQLSuite.
 SELECT a, b, sum(a) FROM parquet_t2 GROUP BY CUBE(a, b) ORDER BY a, b
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `a`, `gen_attr_1` AS `b`, `gen_attr_3` AS `sum(a)` FROM (SELECT `gen_attr_4` AS `gen_attr_0`, `gen_attr_5` AS `gen_attr_1`, sum(`gen_attr_4`) AS `gen_attr_3` FROM (SELECT `a` AS `gen_attr_4`, `b` AS `gen_attr_5`, `c` AS `gen_attr_6`, `d` AS `gen_attr_7` FROM `default`.`parquet_t2`) AS gen_subquery_0 GROUP BY `gen_attr_4`, `gen_attr_5` GROUPING SETS((`gen_attr_4`, `gen_attr_5`), (`gen_attr_4`), (`gen_attr_5`), ()) ORDER BY `gen_attr_0` ASC, `gen_attr_1` ASC) AS gen_subquery_1
+SELECT `gen_attr_0` AS `a`, `gen_attr_1` AS `b`, `gen_attr_3` AS `sum(a)` FROM (SELECT `gen_attr_4` AS `gen_attr_0`, `gen_attr_5` AS `gen_attr_1`, sum(`gen_attr_4`) AS `gen_attr_3` FROM (SELECT `a` AS `gen_attr_4`, `b` AS `gen_attr_5`, `c` AS `gen_attr_6`, `d` AS `gen_attr_7` FROM `default`.`parquet_t2`) AS gen_subquery_0 GROUP BY `gen_attr_4`, `gen_attr_5` GROUPING SETS((`gen_attr_4`, `gen_attr_5`), (`gen_attr_4`), (`gen_attr_5`), ()) ORDER BY `gen_attr_0` ASC NULLS FIRST, `gen_attr_1` ASC NULLS FIRST) AS gen_subquery_1
diff --git a/sql/hive/src/test/resources/sqlgen/sort_asc_nulls_last.sql b/sql/hive/src/test/resources/sqlgen/sort_asc_nulls_last.sql
new file mode 100644
index 0000000000000000000000000000000000000000..da4e3678a33b9b69e973dc9be8a321b8248f7d3b
--- /dev/null
+++ b/sql/hive/src/test/resources/sqlgen/sort_asc_nulls_last.sql
@@ -0,0 +1,4 @@
+-- This file is automatically generated by LogicalPlanToSQLSuite.
+SELECT COUNT(value) FROM parquet_t1 GROUP BY key ORDER BY key nulls last, MAX(key)
+--------------------------------------------------------------------------------
+SELECT `gen_attr_0` AS `count(value)` FROM (SELECT `gen_attr_0` FROM (SELECT count(`gen_attr_4`) AS `gen_attr_0`, `gen_attr_3` AS `gen_attr_1`, max(`gen_attr_3`) AS `gen_attr_2` FROM (SELECT `key` AS `gen_attr_3`, `value` AS `gen_attr_4` FROM `default`.`parquet_t1`) AS gen_subquery_0 GROUP BY `gen_attr_3` ORDER BY `gen_attr_1` ASC NULLS LAST, `gen_attr_2` ASC NULLS FIRST) AS gen_subquery_1) AS gen_subquery_2
diff --git a/sql/hive/src/test/resources/sqlgen/sort_by_after_having.sql b/sql/hive/src/test/resources/sqlgen/sort_by_after_having.sql
index da60204297a214afe2672d434846c7ad5458fedb..a4f3ddc761f307b829b44d3ae0460933a9e2b9ae 100644
--- a/sql/hive/src/test/resources/sqlgen/sort_by_after_having.sql
+++ b/sql/hive/src/test/resources/sqlgen/sort_by_after_having.sql
@@ -1,4 +1,4 @@
 -- This file is automatically generated by LogicalPlanToSQLSuite.
 SELECT COUNT(value) FROM parquet_t1 GROUP BY key HAVING MAX(key) > 0 SORT BY key
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `count(value)` FROM (SELECT `gen_attr_0` FROM (SELECT `gen_attr_0`, `gen_attr_1` FROM (SELECT count(`gen_attr_3`) AS `gen_attr_0`, max(`gen_attr_1`) AS `gen_attr_2`, `gen_attr_1` FROM (SELECT `key` AS `gen_attr_1`, `value` AS `gen_attr_3` FROM `default`.`parquet_t1`) AS gen_subquery_0 GROUP BY `gen_attr_1` HAVING (`gen_attr_2` > CAST(0 AS BIGINT))) AS gen_subquery_1 SORT BY `gen_attr_1` ASC) AS gen_subquery_2) AS gen_subquery_3
+SELECT `gen_attr_0` AS `count(value)` FROM (SELECT `gen_attr_0` FROM (SELECT `gen_attr_0`, `gen_attr_1` FROM (SELECT count(`gen_attr_3`) AS `gen_attr_0`, max(`gen_attr_1`) AS `gen_attr_2`, `gen_attr_1` FROM (SELECT `key` AS `gen_attr_1`, `value` AS `gen_attr_3` FROM `default`.`parquet_t1`) AS gen_subquery_0 GROUP BY `gen_attr_1` HAVING (`gen_attr_2` > CAST(0 AS BIGINT))) AS gen_subquery_1 SORT BY `gen_attr_1` ASC NULLS FIRST) AS gen_subquery_2) AS gen_subquery_3
diff --git a/sql/hive/src/test/resources/sqlgen/sort_desc_nulls_first.sql b/sql/hive/src/test/resources/sqlgen/sort_desc_nulls_first.sql
new file mode 100644
index 0000000000000000000000000000000000000000..d995e3bdfad5cd1f55e2cdd8a9a954f19bcc4122
--- /dev/null
+++ b/sql/hive/src/test/resources/sqlgen/sort_desc_nulls_first.sql
@@ -0,0 +1,4 @@
+-- This file is automatically generated by LogicalPlanToSQLSuite.
+SELECT COUNT(value) FROM parquet_t1 GROUP BY key ORDER BY key desc nulls first,MAX(key)
+--------------------------------------------------------------------------------
+SELECT `gen_attr_0` AS `count(value)` FROM (SELECT `gen_attr_0` FROM (SELECT count(`gen_attr_4`) AS `gen_attr_0`, `gen_attr_3` AS `gen_attr_1`, max(`gen_attr_3`) AS `gen_attr_2` FROM (SELECT `key` AS `gen_attr_3`, `value` AS `gen_attr_4` FROM `default`.`parquet_t1`) AS gen_subquery_0 GROUP BY `gen_attr_3` ORDER BY `gen_attr_1` DESC NULLS FIRST, `gen_attr_2` ASC NULLS FIRST) AS gen_subquery_1) AS gen_subquery_2
diff --git a/sql/hive/src/test/resources/sqlgen/subquery_in_having_1.sql b/sql/hive/src/test/resources/sqlgen/subquery_in_having_1.sql
index 9894f5ab39c7621d48562f5f4cfbcd5bb0e126bb..25882147463b9b837a8306d43ce762ebc1c6469c 100644
--- a/sql/hive/src/test/resources/sqlgen/subquery_in_having_1.sql
+++ b/sql/hive/src/test/resources/sqlgen/subquery_in_having_1.sql
@@ -5,4 +5,4 @@ group by key
 having count(*) in (select count(*) from src s1 where s1.key = '90' group by s1.key)
 order by key
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `key`, `gen_attr_1` AS `count(1)` FROM (SELECT `gen_attr_0`, `gen_attr_1` FROM (SELECT `gen_attr_0`, count(1) AS `gen_attr_1`, count(1) AS `gen_attr_2` FROM (SELECT `key` AS `gen_attr_0`, `value` AS `gen_attr_4` FROM `default`.`src`) AS gen_subquery_0 GROUP BY `gen_attr_0` HAVING (`gen_attr_2` IN (SELECT `gen_attr_5` AS `_c0` FROM (SELECT `gen_attr_3` AS `gen_attr_5` FROM (SELECT count(1) AS `gen_attr_3` FROM (SELECT `key` AS `gen_attr_6`, `value` AS `gen_attr_7` FROM `default`.`src`) AS gen_subquery_3 WHERE (CAST(`gen_attr_6` AS DOUBLE) = CAST('90' AS DOUBLE)) GROUP BY `gen_attr_6`) AS gen_subquery_2) AS gen_subquery_4))) AS gen_subquery_1 ORDER BY `gen_attr_0` ASC) AS src
+SELECT `gen_attr_0` AS `key`, `gen_attr_1` AS `count(1)` FROM (SELECT `gen_attr_0`, `gen_attr_1` FROM (SELECT `gen_attr_0`, count(1) AS `gen_attr_1`, count(1) AS `gen_attr_2` FROM (SELECT `key` AS `gen_attr_0`, `value` AS `gen_attr_4` FROM `default`.`src`) AS gen_subquery_0 GROUP BY `gen_attr_0` HAVING (`gen_attr_2` IN (SELECT `gen_attr_5` AS `_c0` FROM (SELECT `gen_attr_3` AS `gen_attr_5` FROM (SELECT count(1) AS `gen_attr_3` FROM (SELECT `key` AS `gen_attr_6`, `value` AS `gen_attr_7` FROM `default`.`src`) AS gen_subquery_3 WHERE (CAST(`gen_attr_6` AS DOUBLE) = CAST('90' AS DOUBLE)) GROUP BY `gen_attr_6`) AS gen_subquery_2) AS gen_subquery_4))) AS gen_subquery_1 ORDER BY `gen_attr_0` ASC NULLS FIRST) AS src
diff --git a/sql/hive/src/test/resources/sqlgen/subquery_in_having_2.sql b/sql/hive/src/test/resources/sqlgen/subquery_in_having_2.sql
index c3a122aa889b9aadf0eed5c9f9235d11b9323fd9..de0116a4dcbaf2bbb23eb3aed184a55d38deb705 100644
--- a/sql/hive/src/test/resources/sqlgen/subquery_in_having_2.sql
+++ b/sql/hive/src/test/resources/sqlgen/subquery_in_having_2.sql
@@ -7,4 +7,4 @@ having b.key in (select a.key
                  where a.value > 'val_9' and a.value = min(b.value))
 order by b.key
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `key`, `gen_attr_1` AS `min(value)` FROM (SELECT `gen_attr_0`, `gen_attr_1` FROM (SELECT `gen_attr_0`, min(`gen_attr_5`) AS `gen_attr_1`, min(`gen_attr_5`) AS `gen_attr_4` FROM (SELECT `key` AS `gen_attr_0`, `value` AS `gen_attr_5` FROM `default`.`src`) AS gen_subquery_0 GROUP BY `gen_attr_0` HAVING (struct(`gen_attr_0`, `gen_attr_4`) IN (SELECT `gen_attr_6` AS `_c0`, `gen_attr_7` AS `_c1` FROM (SELECT `gen_attr_2` AS `gen_attr_6`, `gen_attr_3` AS `gen_attr_7` FROM (SELECT `gen_attr_2`, `gen_attr_3` FROM (SELECT `key` AS `gen_attr_2`, `value` AS `gen_attr_3` FROM `default`.`src`) AS gen_subquery_3 WHERE (`gen_attr_3` > 'val_9')) AS gen_subquery_2) AS gen_subquery_4))) AS gen_subquery_1 ORDER BY `gen_attr_0` ASC) AS b
+SELECT `gen_attr_0` AS `key`, `gen_attr_1` AS `min(value)` FROM (SELECT `gen_attr_0`, `gen_attr_1` FROM (SELECT `gen_attr_0`, min(`gen_attr_5`) AS `gen_attr_1`, min(`gen_attr_5`) AS `gen_attr_4` FROM (SELECT `key` AS `gen_attr_0`, `value` AS `gen_attr_5` FROM `default`.`src`) AS gen_subquery_0 GROUP BY `gen_attr_0` HAVING (struct(`gen_attr_0`, `gen_attr_4`) IN (SELECT `gen_attr_6` AS `_c0`, `gen_attr_7` AS `_c1` FROM (SELECT `gen_attr_2` AS `gen_attr_6`, `gen_attr_3` AS `gen_attr_7` FROM (SELECT `gen_attr_2`, `gen_attr_3` FROM (SELECT `key` AS `gen_attr_2`, `value` AS `gen_attr_3` FROM `default`.`src`) AS gen_subquery_3 WHERE (`gen_attr_3` > 'val_9')) AS gen_subquery_2) AS gen_subquery_4))) AS gen_subquery_1 ORDER BY `gen_attr_0` ASC NULLS FIRST) AS b
diff --git a/sql/hive/src/test/resources/sqlgen/window_basic_2.sql b/sql/hive/src/test/resources/sqlgen/window_basic_2.sql
index ec55d4b7146f2f185b7de0353c7b3c6f0d57cec8..0e2a9a54731fc8aba2d179d534d1bc707802cd69 100644
--- a/sql/hive/src/test/resources/sqlgen/window_basic_2.sql
+++ b/sql/hive/src/test/resources/sqlgen/window_basic_2.sql
@@ -2,4 +2,4 @@
 SELECT key, value, ROUND(AVG(key) OVER (), 2)
 FROM parquet_t1 ORDER BY key
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `key`, `gen_attr_1` AS `value`, `gen_attr_2` AS `round(avg(key) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), 2)` FROM (SELECT `gen_attr_0`, `gen_attr_1`, round(`gen_attr_3`, 2) AS `gen_attr_2` FROM (SELECT gen_subquery_1.`gen_attr_0`, gen_subquery_1.`gen_attr_1`, avg(`gen_attr_0`) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `gen_attr_3` FROM (SELECT `gen_attr_0`, `gen_attr_1` FROM (SELECT `key` AS `gen_attr_0`, `value` AS `gen_attr_1` FROM `default`.`parquet_t1`) AS gen_subquery_0) AS gen_subquery_1) AS gen_subquery_2 ORDER BY `gen_attr_0` ASC) AS parquet_t1
+SELECT `gen_attr_0` AS `key`, `gen_attr_1` AS `value`, `gen_attr_2` AS `round(avg(key) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), 2)` FROM (SELECT `gen_attr_0`, `gen_attr_1`, round(`gen_attr_3`, 2) AS `gen_attr_2` FROM (SELECT gen_subquery_1.`gen_attr_0`, gen_subquery_1.`gen_attr_1`, avg(`gen_attr_0`) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `gen_attr_3` FROM (SELECT `gen_attr_0`, `gen_attr_1` FROM (SELECT `key` AS `gen_attr_0`, `value` AS `gen_attr_1` FROM `default`.`parquet_t1`) AS gen_subquery_0) AS gen_subquery_1) AS gen_subquery_2 ORDER BY `gen_attr_0` ASC NULLS FIRST) AS parquet_t1
diff --git a/sql/hive/src/test/resources/sqlgen/window_basic_3.sql b/sql/hive/src/test/resources/sqlgen/window_basic_3.sql
index c0ac9541e67ee8cf803697154ae756e9d5e38913..d727caa583e610b68604b92b8bb37545deddbeac 100644
--- a/sql/hive/src/test/resources/sqlgen/window_basic_3.sql
+++ b/sql/hive/src/test/resources/sqlgen/window_basic_3.sql
@@ -2,4 +2,4 @@
 SELECT value, MAX(key + 1) OVER (PARTITION BY key % 5 ORDER BY key % 7) AS max
 FROM parquet_t1
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `value`, `gen_attr_1` AS `max` FROM (SELECT `gen_attr_0`, `gen_attr_1` FROM (SELECT gen_subquery_1.`gen_attr_0`, gen_subquery_1.`gen_attr_2`, gen_subquery_1.`gen_attr_3`, gen_subquery_1.`gen_attr_4`, max(`gen_attr_2`) OVER (PARTITION BY `gen_attr_3` ORDER BY `gen_attr_4` ASC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `gen_attr_1` FROM (SELECT `gen_attr_0`, (`gen_attr_5` + CAST(1 AS BIGINT)) AS `gen_attr_2`, (`gen_attr_5` % CAST(5 AS BIGINT)) AS `gen_attr_3`, (`gen_attr_5` % CAST(7 AS BIGINT)) AS `gen_attr_4` FROM (SELECT `key` AS `gen_attr_5`, `value` AS `gen_attr_0` FROM `default`.`parquet_t1`) AS gen_subquery_0) AS gen_subquery_1) AS gen_subquery_2) AS parquet_t1
+SELECT `gen_attr_0` AS `value`, `gen_attr_1` AS `max` FROM (SELECT `gen_attr_0`, `gen_attr_1` FROM (SELECT gen_subquery_1.`gen_attr_0`, gen_subquery_1.`gen_attr_2`, gen_subquery_1.`gen_attr_3`, gen_subquery_1.`gen_attr_4`, max(`gen_attr_2`) OVER (PARTITION BY `gen_attr_3` ORDER BY `gen_attr_4` ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `gen_attr_1` FROM (SELECT `gen_attr_0`, (`gen_attr_5` + CAST(1 AS BIGINT)) AS `gen_attr_2`, (`gen_attr_5` % CAST(5 AS BIGINT)) AS `gen_attr_3`, (`gen_attr_5` % CAST(7 AS BIGINT)) AS `gen_attr_4` FROM (SELECT `key` AS `gen_attr_5`, `value` AS `gen_attr_0` FROM `default`.`parquet_t1`) AS gen_subquery_0) AS gen_subquery_1) AS gen_subquery_2) AS parquet_t1
diff --git a/sql/hive/src/test/resources/sqlgen/window_basic_asc_nulls_last.sql b/sql/hive/src/test/resources/sqlgen/window_basic_asc_nulls_last.sql
new file mode 100644
index 0000000000000000000000000000000000000000..4739f05808daf5cf458d6e49535bb2345b2ac334
--- /dev/null
+++ b/sql/hive/src/test/resources/sqlgen/window_basic_asc_nulls_last.sql
@@ -0,0 +1,5 @@
+-- This file is automatically generated by LogicalPlanToSQLSuite.
+SELECT key, value, ROUND(AVG(key) OVER (), 2)
+FROM parquet_t1 ORDER BY key nulls last
+--------------------------------------------------------------------------------
+SELECT `gen_attr_0` AS `key`, `gen_attr_1` AS `value`, `gen_attr_2` AS `round(avg(key) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), 2)` FROM (SELECT `gen_attr_0`, `gen_attr_1`, round(`gen_attr_3`, 2) AS `gen_attr_2` FROM (SELECT gen_subquery_1.`gen_attr_0`, gen_subquery_1.`gen_attr_1`, avg(`gen_attr_0`) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `gen_attr_3` FROM (SELECT `gen_attr_0`, `gen_attr_1` FROM (SELECT `key` AS `gen_attr_0`, `value` AS `gen_attr_1` FROM `default`.`parquet_t1`) AS gen_subquery_0) AS gen_subquery_1) AS gen_subquery_2 ORDER BY `gen_attr_0` ASC NULLS LAST) AS parquet_t1
diff --git a/sql/hive/src/test/resources/sqlgen/window_basic_desc_nulls_first.sql b/sql/hive/src/test/resources/sqlgen/window_basic_desc_nulls_first.sql
new file mode 100644
index 0000000000000000000000000000000000000000..1b9db2993b09d7b5b60ddcc97c55e3e9a4d1c51a
--- /dev/null
+++ b/sql/hive/src/test/resources/sqlgen/window_basic_desc_nulls_first.sql
@@ -0,0 +1,5 @@
+-- This file is automatically generated by LogicalPlanToSQLSuite.
+SELECT key, value, ROUND(AVG(key) OVER (), 2)
+FROM parquet_t1 ORDER BY key desc nulls first
+--------------------------------------------------------------------------------
+SELECT `gen_attr_0` AS `key`, `gen_attr_1` AS `value`, `gen_attr_2` AS `round(avg(key) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), 2)` FROM (SELECT `gen_attr_0`, `gen_attr_1`, round(`gen_attr_3`, 2) AS `gen_attr_2` FROM (SELECT gen_subquery_1.`gen_attr_0`, gen_subquery_1.`gen_attr_1`, avg(`gen_attr_0`) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `gen_attr_3` FROM (SELECT `gen_attr_0`, `gen_attr_1` FROM (SELECT `key` AS `gen_attr_0`, `value` AS `gen_attr_1` FROM `default`.`parquet_t1`) AS gen_subquery_0) AS gen_subquery_1) AS gen_subquery_2 ORDER BY `gen_attr_0` DESC NULLS FIRST) AS parquet_t1
diff --git a/sql/hive/src/test/resources/sqlgen/window_with_join.sql b/sql/hive/src/test/resources/sqlgen/window_with_join.sql
index 030a4c0907a1c0dc75f9f8356d9c955b8787af4f..43d5b47be8fbabaa253d4dbb866c9078b6b7242a 100644
--- a/sql/hive/src/test/resources/sqlgen/window_with_join.sql
+++ b/sql/hive/src/test/resources/sqlgen/window_with_join.sql
@@ -2,4 +2,4 @@
 SELECT x.key, MAX(y.key) OVER (PARTITION BY x.key % 5 ORDER BY x.key)
 FROM parquet_t1 x JOIN parquet_t1 y ON x.key = y.key
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `key`, `gen_attr_1` AS `max(key) OVER (PARTITION BY (key % CAST(5 AS BIGINT)) ORDER BY key ASC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)` FROM (SELECT `gen_attr_0`, `gen_attr_1` FROM (SELECT gen_subquery_2.`gen_attr_0`, gen_subquery_2.`gen_attr_2`, gen_subquery_2.`gen_attr_3`, max(`gen_attr_2`) OVER (PARTITION BY `gen_attr_3` ORDER BY `gen_attr_0` ASC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `gen_attr_1` FROM (SELECT `gen_attr_0`, `gen_attr_2`, (`gen_attr_0` % CAST(5 AS BIGINT)) AS `gen_attr_3` FROM (SELECT `key` AS `gen_attr_0`, `value` AS `gen_attr_4` FROM `default`.`parquet_t1`) AS gen_subquery_0 INNER JOIN (SELECT `key` AS `gen_attr_2`, `value` AS `gen_attr_5` FROM `default`.`parquet_t1`) AS gen_subquery_1 ON (`gen_attr_0` = `gen_attr_2`)) AS gen_subquery_2) AS gen_subquery_3) AS x
+SELECT `gen_attr_0` AS `key`, `gen_attr_1` AS `max(key) OVER (PARTITION BY (key % CAST(5 AS BIGINT)) ORDER BY key ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)` FROM (SELECT `gen_attr_0`, `gen_attr_1` FROM (SELECT gen_subquery_2.`gen_attr_0`, gen_subquery_2.`gen_attr_2`, gen_subquery_2.`gen_attr_3`, max(`gen_attr_2`) OVER (PARTITION BY `gen_attr_3` ORDER BY `gen_attr_0` ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `gen_attr_1` FROM (SELECT `gen_attr_0`, `gen_attr_2`, (`gen_attr_0` % CAST(5 AS BIGINT)) AS `gen_attr_3` FROM (SELECT `key` AS `gen_attr_0`, `value` AS `gen_attr_4` FROM `default`.`parquet_t1`) AS gen_subquery_0 INNER JOIN (SELECT `key` AS `gen_attr_2`, `value` AS `gen_attr_5` FROM `default`.`parquet_t1`) AS gen_subquery_1 ON (`gen_attr_0` = `gen_attr_2`)) AS gen_subquery_2) AS gen_subquery_3) AS x
diff --git a/sql/hive/src/test/resources/sqlgen/window_with_the_same_window_with_agg.sql b/sql/hive/src/test/resources/sqlgen/window_with_the_same_window_with_agg.sql
index 7b99539a0548006e42be25b4c2ea5e891bb4b93b..33a8e83750be0e1836547f71ea768b62e55f6c86 100644
--- a/sql/hive/src/test/resources/sqlgen/window_with_the_same_window_with_agg.sql
+++ b/sql/hive/src/test/resources/sqlgen/window_with_the_same_window_with_agg.sql
@@ -4,4 +4,4 @@ DENSE_RANK() OVER (DISTRIBUTE BY key SORT BY key, value) AS dr,
 COUNT(key)
 FROM parquet_t1 GROUP BY key, value
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `key`, `gen_attr_1` AS `value`, `gen_attr_2` AS `dr`, `gen_attr_3` AS `count(key)` FROM (SELECT `gen_attr_0`, `gen_attr_1`, `gen_attr_2`, `gen_attr_3` FROM (SELECT gen_subquery_1.`gen_attr_0`, gen_subquery_1.`gen_attr_1`, gen_subquery_1.`gen_attr_3`, DENSE_RANK() OVER (PARTITION BY `gen_attr_0` ORDER BY `gen_attr_0` ASC, `gen_attr_1` ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `gen_attr_2` FROM (SELECT `gen_attr_0`, `gen_attr_1`, count(`gen_attr_0`) AS `gen_attr_3` FROM (SELECT `key` AS `gen_attr_0`, `value` AS `gen_attr_1` FROM `default`.`parquet_t1`) AS gen_subquery_0 GROUP BY `gen_attr_0`, `gen_attr_1`) AS gen_subquery_1) AS gen_subquery_2) AS parquet_t1
+SELECT `gen_attr_0` AS `key`, `gen_attr_1` AS `value`, `gen_attr_2` AS `dr`, `gen_attr_3` AS `count(key)` FROM (SELECT `gen_attr_0`, `gen_attr_1`, `gen_attr_2`, `gen_attr_3` FROM (SELECT gen_subquery_1.`gen_attr_0`, gen_subquery_1.`gen_attr_1`, gen_subquery_1.`gen_attr_3`, DENSE_RANK() OVER (PARTITION BY `gen_attr_0` ORDER BY `gen_attr_0` ASC NULLS FIRST, `gen_attr_1` ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `gen_attr_2` FROM (SELECT `gen_attr_0`, `gen_attr_1`, count(`gen_attr_0`) AS `gen_attr_3` FROM (SELECT `key` AS `gen_attr_0`, `value` AS `gen_attr_1` FROM `default`.`parquet_t1`) AS gen_subquery_0 GROUP BY `gen_attr_0`, `gen_attr_1`) AS gen_subquery_1) AS gen_subquery_2) AS parquet_t1
diff --git a/sql/hive/src/test/resources/sqlgen/window_with_the_same_window_with_agg_filter.sql b/sql/hive/src/test/resources/sqlgen/window_with_the_same_window_with_agg_filter.sql
index 591a654a3888ef7d6fca8c0930e029a2e7ce36ec..e01bc034d3d122b897b5bc7a63da80780e97b1c2 100644
--- a/sql/hive/src/test/resources/sqlgen/window_with_the_same_window_with_agg_filter.sql
+++ b/sql/hive/src/test/resources/sqlgen/window_with_the_same_window_with_agg_filter.sql
@@ -4,4 +4,4 @@ DENSE_RANK() OVER (DISTRIBUTE BY key SORT BY key, value) AS dr,
 COUNT(key) OVER(DISTRIBUTE BY key SORT BY key, value) AS ca
 FROM parquet_t1
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `key`, `gen_attr_1` AS `value`, `gen_attr_2` AS `dr`, `gen_attr_3` AS `ca` FROM (SELECT `gen_attr_0`, `gen_attr_1`, `gen_attr_2`, `gen_attr_3` FROM (SELECT gen_subquery_1.`gen_attr_0`, gen_subquery_1.`gen_attr_1`, DENSE_RANK() OVER (PARTITION BY `gen_attr_0` ORDER BY `gen_attr_0` ASC, `gen_attr_1` ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `gen_attr_2`, count(`gen_attr_0`) OVER (PARTITION BY `gen_attr_0` ORDER BY `gen_attr_0` ASC, `gen_attr_1` ASC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `gen_attr_3` FROM (SELECT `gen_attr_0`, `gen_attr_1` FROM (SELECT `key` AS `gen_attr_0`, `value` AS `gen_attr_1` FROM `default`.`parquet_t1`) AS gen_subquery_0) AS gen_subquery_1) AS gen_subquery_2) AS parquet_t1
+SELECT `gen_attr_0` AS `key`, `gen_attr_1` AS `value`, `gen_attr_2` AS `dr`, `gen_attr_3` AS `ca` FROM (SELECT `gen_attr_0`, `gen_attr_1`, `gen_attr_2`, `gen_attr_3` FROM (SELECT gen_subquery_1.`gen_attr_0`, gen_subquery_1.`gen_attr_1`, DENSE_RANK() OVER (PARTITION BY `gen_attr_0` ORDER BY `gen_attr_0` ASC NULLS FIRST, `gen_attr_1` ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `gen_attr_2`, count(`gen_attr_0`) OVER (PARTITION BY `gen_attr_0` ORDER BY `gen_attr_0` ASC NULLS FIRST, `gen_attr_1` ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `gen_attr_3` FROM (SELECT `gen_attr_0`, `gen_attr_1` FROM (SELECT `key` AS `gen_attr_0`, `value` AS `gen_attr_1` FROM `default`.`parquet_t1`) AS gen_subquery_0) AS gen_subquery_1) AS gen_subquery_2) AS parquet_t1
diff --git a/sql/hive/src/test/resources/sqlgen/window_with_the_same_window_with_agg_functions.sql b/sql/hive/src/test/resources/sqlgen/window_with_the_same_window_with_agg_functions.sql
index d9169eab6e46ac7d1760d54ae793fc5fd080b59c..dbfa408fa517e339043987ce5bb07baa82c8fc3e 100644
--- a/sql/hive/src/test/resources/sqlgen/window_with_the_same_window_with_agg_functions.sql
+++ b/sql/hive/src/test/resources/sqlgen/window_with_the_same_window_with_agg_functions.sql
@@ -3,4 +3,4 @@ SELECT key, value,
 MAX(value) OVER (PARTITION BY key % 5 ORDER BY key) AS max
 FROM parquet_t1 GROUP BY key, value
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `key`, `gen_attr_1` AS `value`, `gen_attr_2` AS `max` FROM (SELECT `gen_attr_0`, `gen_attr_1`, `gen_attr_2` FROM (SELECT gen_subquery_1.`gen_attr_0`, gen_subquery_1.`gen_attr_1`, gen_subquery_1.`gen_attr_3`, max(`gen_attr_1`) OVER (PARTITION BY `gen_attr_3` ORDER BY `gen_attr_0` ASC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `gen_attr_2` FROM (SELECT `gen_attr_0`, `gen_attr_1`, (`gen_attr_0` % CAST(5 AS BIGINT)) AS `gen_attr_3` FROM (SELECT `key` AS `gen_attr_0`, `value` AS `gen_attr_1` FROM `default`.`parquet_t1`) AS gen_subquery_0 GROUP BY `gen_attr_0`, `gen_attr_1`) AS gen_subquery_1) AS gen_subquery_2) AS parquet_t1
+SELECT `gen_attr_0` AS `key`, `gen_attr_1` AS `value`, `gen_attr_2` AS `max` FROM (SELECT `gen_attr_0`, `gen_attr_1`, `gen_attr_2` FROM (SELECT gen_subquery_1.`gen_attr_0`, gen_subquery_1.`gen_attr_1`, gen_subquery_1.`gen_attr_3`, max(`gen_attr_1`) OVER (PARTITION BY `gen_attr_3` ORDER BY `gen_attr_0` ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `gen_attr_2` FROM (SELECT `gen_attr_0`, `gen_attr_1`, (`gen_attr_0` % CAST(5 AS BIGINT)) AS `gen_attr_3` FROM (SELECT `key` AS `gen_attr_0`, `value` AS `gen_attr_1` FROM `default`.`parquet_t1`) AS gen_subquery_0 GROUP BY `gen_attr_0`, `gen_attr_1`) AS gen_subquery_1) AS gen_subquery_2) AS parquet_t1
diff --git a/sql/hive/src/test/resources/sqlgen/window_with_the_same_window_with_agg_having.sql b/sql/hive/src/test/resources/sqlgen/window_with_the_same_window_with_agg_having.sql
index f0a820811ee0ae530bbeacffbaadfb4e41ca30d7..6f5741b9462629c9b1eb7aa14a71065bda3b9b16 100644
--- a/sql/hive/src/test/resources/sqlgen/window_with_the_same_window_with_agg_having.sql
+++ b/sql/hive/src/test/resources/sqlgen/window_with_the_same_window_with_agg_having.sql
@@ -3,4 +3,4 @@ SELECT key, value,
 MAX(value) OVER (PARTITION BY key % 5 ORDER BY key DESC) AS max
 FROM parquet_t1 GROUP BY key, value HAVING key > 5
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `key`, `gen_attr_1` AS `value`, `gen_attr_2` AS `max` FROM (SELECT `gen_attr_0`, `gen_attr_1`, `gen_attr_2` FROM (SELECT gen_subquery_1.`gen_attr_0`, gen_subquery_1.`gen_attr_1`, gen_subquery_1.`gen_attr_3`, max(`gen_attr_1`) OVER (PARTITION BY `gen_attr_3` ORDER BY `gen_attr_0` DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `gen_attr_2` FROM (SELECT `gen_attr_0`, `gen_attr_1`, (`gen_attr_0` % CAST(5 AS BIGINT)) AS `gen_attr_3` FROM (SELECT `key` AS `gen_attr_0`, `value` AS `gen_attr_1` FROM `default`.`parquet_t1`) AS gen_subquery_0 GROUP BY `gen_attr_0`, `gen_attr_1` HAVING (`gen_attr_0` > CAST(5 AS BIGINT))) AS gen_subquery_1) AS gen_subquery_2) AS parquet_t1
+SELECT `gen_attr_0` AS `key`, `gen_attr_1` AS `value`, `gen_attr_2` AS `max` FROM (SELECT `gen_attr_0`, `gen_attr_1`, `gen_attr_2` FROM (SELECT gen_subquery_1.`gen_attr_0`, gen_subquery_1.`gen_attr_1`, gen_subquery_1.`gen_attr_3`, max(`gen_attr_1`) OVER (PARTITION BY `gen_attr_3` ORDER BY `gen_attr_0` DESC NULLS LAST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `gen_attr_2` FROM (SELECT `gen_attr_0`, `gen_attr_1`, (`gen_attr_0` % CAST(5 AS BIGINT)) AS `gen_attr_3` FROM (SELECT `key` AS `gen_attr_0`, `value` AS `gen_attr_1` FROM `default`.`parquet_t1`) AS gen_subquery_0 GROUP BY `gen_attr_0`, `gen_attr_1` HAVING (`gen_attr_0` > CAST(5 AS BIGINT))) AS gen_subquery_1) AS gen_subquery_2) AS parquet_t1
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionSQLBuilderSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionSQLBuilderSuite.scala
index d2b2f38fa1f71c19736e37d45f1d2c8f3d8e360e..ce5efe853ca4fe2f1ea34fa573348982013a5cfb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionSQLBuilderSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionSQLBuilderSuite.scala
@@ -106,17 +106,17 @@ class ExpressionSQLBuilderSuite extends SQLBuilderTest {
 
     checkSQL(
       WindowSpecDefinition(Nil, 'a.int.asc :: Nil, frame),
-      s"(ORDER BY `a` ASC $frame)"
+      s"(ORDER BY `a` ASC NULLS FIRST $frame)"
     )
 
     checkSQL(
       WindowSpecDefinition(Nil, 'a.int.asc :: 'b.string.desc :: Nil, frame),
-      s"(ORDER BY `a` ASC, `b` DESC $frame)"
+      s"(ORDER BY `a` ASC NULLS FIRST, `b` DESC NULLS LAST $frame)"
     )
 
     checkSQL(
       WindowSpecDefinition('a.int :: 'b.string :: Nil, 'c.int.asc :: 'd.string.desc :: Nil, frame),
-      s"(PARTITION BY `a`, `b` ORDER BY `c` ASC, `d` DESC $frame)"
+      s"(PARTITION BY `a`, `b` ORDER BY `c` ASC NULLS FIRST, `d` DESC NULLS LAST $frame)"
     )
   }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
index d80f894c22dd8e866366f91230ad04c1982beb5b..7fa5c29dc5b8f46382d6c9d30ae55803b97e1caf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
@@ -235,6 +235,16 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
     checkSQL("SELECT COUNT(value) FROM parquet_t1 GROUP BY key ORDER BY key, MAX(key)", "agg3")
   }
 
+  test("order by asc nulls last") {
+    checkSQL("SELECT COUNT(value) FROM parquet_t1 GROUP BY key ORDER BY key nulls last, MAX(key)",
+      "sort_asc_nulls_last")
+  }
+
+  test("order by desc nulls first") {
+    checkSQL("SELECT COUNT(value) FROM parquet_t1 GROUP BY key ORDER BY key desc nulls first," +
+      "MAX(key)", "sort_desc_nulls_first")
+  }
+
   test("type widening in union") {
     checkSQL("SELECT id FROM parquet_t0 UNION ALL SELECT CAST(id AS INT) AS id FROM parquet_t0",
       "type_widening")
@@ -697,6 +707,20 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
          |FROM parquet_t1
       """.stripMargin,
       "window_basic_3")
+
+    checkSQL(
+      """
+        |SELECT key, value, ROUND(AVG(key) OVER (), 2)
+        |FROM parquet_t1 ORDER BY key nulls last
+      """.stripMargin,
+      "window_basic_asc_nulls_last")
+
+    checkSQL(
+      """
+        |SELECT key, value, ROUND(AVG(key) OVER (), 2)
+        |FROM parquet_t1 ORDER BY key desc nulls first
+      """.stripMargin,
+      "window_basic_desc_nulls_first")
   }
 
   test("multiple window functions in one expression") {