diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index a448c794213ae132a4eac1e15576ca58c5644c67..d3b4cf8e3424259db733ddbd36a35923c48b2e5d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -60,7 +60,7 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
       ResolveFunctions ::
       GlobalAggregates ::
       UnresolvedHavingClauseAttributes ::
-      TrimAliases ::
+      TrimGroupingAliases ::
       typeCoercionRules ++
       extendedRules : _*),
     Batch("Check Analysis", Once,
@@ -93,17 +93,10 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
   /**
    * Removes no-op Alias expressions from the plan.
    */
-  object TrimAliases extends Rule[LogicalPlan] {
+  object TrimGroupingAliases extends Rule[LogicalPlan] {
     def apply(plan: LogicalPlan): LogicalPlan = plan transform {
       case Aggregate(groups, aggs, child) =>
-        Aggregate(
-          groups.map {
-            _ transform {
-              case Alias(c, _) => c
-            }
-          },
-          aggs,
-          child)
+        Aggregate(groups.map(_.transform { case Alias(c, _) => c }), aggs, child)
     }
   }
 
@@ -122,10 +115,15 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
             case e => e.children.forall(isValidAggregateExpression)
           }
 
-          aggregateExprs.foreach { e =>
-            if (!isValidAggregateExpression(e)) {
-              throw new TreeNodeException(plan, s"Expression not in GROUP BY: $e")
-            }
+          aggregateExprs.find { e =>
+            !isValidAggregateExpression(e.transform {
+              // Should trim aliases around `GetField`s. These aliases are introduced while
+              // resolving struct field accesses, because `GetField` is not a `NamedExpression`.
+              // (Should we just turn `GetField` into a `NamedExpression`?)
+              case Alias(g: GetField, _) => g
+            })
+          }.foreach { e =>
+            throw new TreeNodeException(plan, s"Expression not in GROUP BY: $e")
           }
 
           aggregatePlan
@@ -328,4 +326,3 @@ object EliminateAnalysisOperators extends Rule[LogicalPlan] {
     case Subquery(_, child) => child
   }
 }
-
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index f0fd9a8b9a46e8fbe4d058c046017ac87270ff33..310d127506d68efaa903bc9db7c639892376c35a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -151,8 +151,15 @@ object PartialAggregation {
         val rewrittenAggregateExpressions = aggregateExpressions.map(_.transformUp {
           case e: Expression if partialEvaluations.contains(new TreeNodeRef(e)) =>
             partialEvaluations(new TreeNodeRef(e)).finalEvaluation
-          case e: Expression if namedGroupingExpressions.contains(e) =>
-            namedGroupingExpressions(e).toAttribute
+
+          case e: Expression =>
+            // Should trim aliases around `GetField`s. These aliases are introduced while
+            // resolving struct field accesses, because `GetField` is not a `NamedExpression`.
+            // (Should we just turn `GetField` into a `NamedExpression`?)
+            namedGroupingExpressions
+              .get(e.transform { case Alias(g: GetField, _) => g })
+              .map(_.toAttribute)
+              .getOrElse(e)
         }).asInstanceOf[Seq[NamedExpression]]
 
         val partialComputation =
@@ -188,7 +195,7 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
       logDebug(s"Considering join on: $condition")
       // Find equi-join predicates that can be evaluated before the join, and thus can be used
       // as join keys.
-      val (joinPredicates, otherPredicates) = 
+      val (joinPredicates, otherPredicates) =
         condition.map(splitConjunctivePredicates).getOrElse(Nil).partition {
           case EqualTo(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
             (canEvaluate(l, right) && canEvaluate(r, left)) => true
@@ -203,7 +210,7 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
       val rightKeys = joinKeys.map(_._2)
 
       if (joinKeys.nonEmpty) {
-        logDebug(s"leftKeys:${leftKeys} | rightKeys:${rightKeys}")
+        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
         Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
       } else {
         None
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 5dd777f1fb3b7729eeff0a5378f8e3da49fdb4ac..ce5672c08653a2544d5510619ad91494272c025e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -551,7 +551,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
       sql("SELECT * FROM upperCaseData EXCEPT SELECT * FROM upperCaseData"), Nil)
   }
 
- test("INTERSECT") {
+  test("INTERSECT") {
     checkAnswer(
       sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM lowerCaseData"),
       (1, "a") ::
@@ -949,4 +949,14 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
     checkAnswer(sql("SELECT key FROM testData WHERE value not like '100%' order by key"),
         (1 to 99).map(i => Seq(i)))
   }
+
+  test("SPARK-4322 Grouping field with struct field as sub expression") {
+    jsonRDD(sparkContext.makeRDD("""{"a": {"b": [{"c": 1}]}}""" :: Nil)).registerTempTable("data")
+    checkAnswer(sql("SELECT a.b[0].c FROM data GROUP BY a.b[0].c"), 1)
+    dropTempTable("data")
+
+    jsonRDD(sparkContext.makeRDD("""{"a": {"b": 1}}""" :: Nil)).registerTempTable("data")
+    checkAnswer(sql("SELECT a.b + 1 FROM data GROUP BY a.b + 1"), 2)
+    dropTempTable("data")
+  }
 }